Hi everyone,

*last call for feedback* on this FLIP. Otherwise I would start a VOTE by tomorrow.

@Wenlong: Thanks for offering your help. Once the FLIP has been accepted. I will create a list of subtasks that we can split among contributors. Many can be implemented in parallel.

Regards,
Timo


On 13.12.21 09:20, wenlong.lwl wrote:
Hi, Timo, +1 for the improvement too. Thanks for the great job.

Looking forward to the next progress of the FLIP, I could help on the
development of some of the specific improvements.

Best,
Wenlong

On Mon, 13 Dec 2021 at 14:43, godfrey he <godfre...@gmail.com> wrote:

Hi Timo,

+1 for the improvement.

Best,
Godfrey

Timo Walther <twal...@apache.org> 于2021年12月10日周五 20:37写道:

Hi Wenlong,

yes it will. Sorry for the confusion. This is a logical consequence of
the assumption:

The JSON plan contains no implementation details (i.e. no classes) and
is fully declarative.

I will add a remark.

Thanks,
Timo


On 10.12.21 11:43, wenlong.lwl wrote:
hi, Timo, thanks for the explanation. I totally agree with what you
said.
My actual question is: Will the version of an exec node be serialised
in
the Json Plan? In my understanding, it is not in the former design. If
it
is yes, my question is solved already.


Best,
Wenlong


On Fri, 10 Dec 2021 at 18:15, Timo Walther <twal...@apache.org> wrote:

Hi Wenlong,

also thought about adding a `flinkVersion` field per ExecNode. But
this
is not necessary, because the `version` of the ExecNode has the same
purpose.

The plan version just encodes that:
"plan has been updated in Flink 1.17" / "plan is entirely valid for
Flink 1.17"

The ExecNode version maps to `minStateVersion` to verify state
compatibility.

So even if the plan version is 1.17, some ExecNodes use state layout
of
1.15.

It is totally fine to only update the ExecNode to version 2 and not 3
in
your example.

Regards,
Timo



On 10.12.21 06:02, wenlong.lwl wrote:
Hi, Timo, thanks for updating the doc.

I have a comment on plan migration:
I think we may need to add a version field for every exec node when
serialising. In earlier discussions, I think we have a conclusion
that
treating the version of plan as the version of node, but in this
case it
would be broken.
Take the following example in FLIP into consideration, there is a bad
case:
when in 1.17, we introduced an incompatible version 3 and dropped
version
1, we can only update the version to 2, so the version should be per
exec
node.

ExecNode version *1* is not supported anymore. Even though the state
is
actually compatible. The plan restore will fail with a helpful
exception
that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely replace the old version *1* with *2.
The
JSON plan flinkVersion changes to 1.17.*


Best,

Wenlong

On Thu, 9 Dec 2021 at 18:36, Timo Walther <twal...@apache.org>
wrote:

Hi Jing and Godfrey,

I had another iteration over the document. There are two major
changes:

1. Supported Flink Upgrade Versions

I got the feedback via various channels that a step size of one
minor
version is not very convenient. As you said, "because upgrading to
a new
version is a time-consuming process". I rephrased this section:

Upgrading usually involves work which is why many users perform this
task rarely (e.g. only once per year). Also skipping a versions is
common until a new feature has been introduced for which is it
worth to
upgrade. We will support the upgrade to the most recent Flink
version
from a set of previous versions. We aim to support upgrades from the
last 2-3 releases on a best-effort basis; maybe even more depending
on
the maintenance overhead. However, in order to not grow the testing
matrix infinitely and to perform important refactoring if
necessary, we
only guarantee upgrades with a step size of a single minor version
(i.e.
a cascade of upgrades).

2. Annotation Design

I also adopted the multiple annotations design for the previous
supportPlanFormat. So no array of versions anymore. I reworked the
section, please have a look with updated examples:




https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-ExecNodeTests

I also got the feedback offline that `savepoint` might not be the
right
terminology for the annotation. I changed that to minPlanVersion and
minStateVersion.

Let me know what you think.

Regards,
Timo



On 09.12.21 08:44, Jing Zhang wrote:
Hi Timo,
Thanks a lot for driving this discussion.
I believe it could solve many problems what we are suffering in
upgrading.

I only have a little complain on the following point.

For simplification of the design, we assume that upgrades use a
step
size
of a single minor version. We don't guarantee skipping minor
versions
(e.g.
1.11 to
1.14).

In our internal production environment, we follow up with the
community's
latest stable release version almost once a year because upgrading
to a
new
version is a time-consuming process.
So we might missed 1~3 version after we upgrade to the latest
version.
This
might also appears in other company too.
Could we guarantee FLIP-190 work if we skip minor versions less
than
specified threshold?
Then we could know which version is good for us when prepare
upgrading.

Best,
Jing Zhang

godfrey he <godfre...@gmail.com> 于2021年12月8日周三 22:16写道:

Hi Timo,

Thanks for the explanation, it's much clearer now.

One thing I want to confirm about `supportedPlanFormat `
and `supportedSavepointFormat `:
`supportedPlanFormat ` supports multiple versions,
while `supportedSavepointFormat ` supports only one version ?
A json plan  can be deserialized by multiple versions
because default value will be set for new fields.
In theory, a Savepoint can be restored by more than one version
of the operators even if a state layout is changed,
such as deleting a whole state and starting job with
`allowNonRestoredState`=true.
I think this is a corner case, and it's hard to understand
comparing
to `supportedPlanFormat ` supporting multiple versions.
So, for most cases, when the state layout is changed, the
savepoint is
incompatible,
and `supportedSavepointFormat` and version need to be changed.

I think we need a detail explanation about the annotations change
story
in
the java doc of  `ExecNodeMetadata` class for all developers
(esp. those unfamiliar with this part).

Best,
Godfrey

Timo Walther <twal...@apache.org> 于2021年12月8日周三 下午4:57写道:

Hi Wenlong,

thanks for the feedback. Great that we reached consensus here. I
will
update the entire document with my previous example shortly.

     > if we don't update the version when plan format changes, we
can't
find that the plan can't not be deserialized in 1.15

This should not be a problem as the entire plan file has a
version as
well. We should not allow reading a 1.16 plan in 1.15. We can
throw a
helpful exception early.

Reading a 1.15 plan in 1.16 is possible until we drop the old
`supportedPlanFormat` from one of used ExecNodes. Afterwards all
`supportedPlanFormat` of ExecNodes must be equal or higher then
the
plan
version.

Regards,
Timo

On 08.12.21 03:07, wenlong.lwl wrote:
Hi, Timo,  +1 for multi metadata.

The compatible change I mean in the last email is the slight
state
change
example you gave, so we have got  consensus on this actually,
IMO.

Another question based on the example you gave:
In the example "JSON node gets an additional property in 1.16",
if
we
don't
update the version when plan format changes, we can't find that
the
plan
can't not be deserialized in 1.15, although the savepoint state
is
compatible.
The error message may be not so friendly if we just throw
deserialization
failure.

On Tue, 7 Dec 2021 at 16:49, Timo Walther <twal...@apache.org>
wrote:

Hi Wenlong,

      > First,  we add a newStateLayout because of some
improvement
in
state, in
      > order to keep compatibility we may still keep the old
state
for
the
first
      > version. We need to update the version, so that we can
generate
a
new
      > version plan for the new job and keep the exec node
compatible
with
the old
      > version plan.

The problem that I see here for contributors is that the actual
update
of a version is more complicated than just updating an integer
value.
It
means copying a lot of ExecNode code for a change that happens
locally
in an operator. Let's assume multiple ExecNodes use a similar
operator.
Why do we need to update all ExecNode versions, if the operator
itself
can deal with the incompatibility. The ExecNode version is
meant
for
topology changes or fundamental state changes.

If we don't find consensus on this topic, I would at least
vote for
supporting multiple annotations for an ExecNode class. This
way we
don't
need to copy code but only add two ExecNode annotations with
different
ExecNode versions.

      > Maybe we can add support for this case :
      > when an exec node is changed in 1.16, but is compatible
with
1.15,
      > we can use the node of 1.16 to deserialize the plan of
1.15.

If the ExecNode is compatible, there is no reason to increase
the
ExecNode version.



I tried to come up with a reworked solution to make all parties
happy:

1. Let's assume the following annotations:

supportedPlanFormat = [1.15]

supportedSavepointFormat = 1.15

we drop `added` as it is equal to `supportedSavepointFormat`

2. Multiple annotations over ExecNodes are possible:

// operator state changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// state layout changed slightly in 1.16
// - operator migration is possible
// - operator supports state of both versions and will perform
operator
state migration
// - new plans will get new ExecNode version
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)

// we force a plan migration in 1.17
// - we assume that all operator states have been migrated in
the
previous version
// - we can safely replace the old version `1` with `2` and
only
keep
the new savepoint format
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)


// plan changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// JSON node gets an additional property in 1.16
// e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
// - ExecNode version does not change
// - ExecNode version only changes when topology or state is
affected
// - we support both JSON plan formats, the old and the newest
one
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15,
1.16],
supportedSavepointFormat=1.15)

// we force a plan migration in 1.17
// - now we only support 1.16 plan format
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16,
supportedSavepointFormat=1.15)


// topology change

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// complete new class structure in 1.16 annotated with
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)



What do you think?


Regards,
Timo









On 07.12.21 08:20, wenlong.lwl wrote:
Maybe we can add support for this case :
              when an exec node is changed in 1.16, but is
compatible
with
1.15,
we can use the node of 1.16 to deserialize the plan of 1.15.
By this way, we don't need to fork the code if the change is
compatible,
and can avoid fork code frequently.


Best,
Wenlong


On Tue, 7 Dec 2021 at 15:08, wenlong.lwl <
wenlong88....@gmail.com

wrote:

hi, Timo, I would prefer to update the version every time we
change
the
state layer too.

It could be possible that we change the exec node in 2 steps:
First,  we add a newStateLayout because of some improvement
in
state, in
order to keep compatibility we may still keep the old state
for
the
first
version. We need to update the version, so that we can
generate a
new
version plan for the new job and keep the exec node
compatible
with
the
old
version plan.
After some versions, we may remove the old version state
layout
and
clean
up the deprecated code. We still need to update the version,
so
that we
can
verify that we are compatible with the plan after the first
change,
but
not
compatible with the plan earlier.


Best,
Wenlong

On Mon, 6 Dec 2021 at 21:27, Timo Walther <
twal...@apache.org>
wrote:

Hi Godfrey,

       > design makes thing more complex.

Yes, the design might be a bit more complex. But operator
migration is
way easier than ExecNode migration at a later point in time
for
code
maintenance. We know that ExecNodes can become pretty
complex.
Even
though we have put a lot of code into `CommonXXExecNode` it
will
be a
lot of work to maintain multiple versions of ExecNodes. If
we
can
avoid
this with operator state migration, this should always be
preferred
over
a new ExecNode version.

I'm aware that operator state migration might only be
important
for
roughly 10 % of all changes. A new ExecNode version will be
used
for
90%
of all changes.

       > If there are multiple state layouts, which layout
the
ExecNode
should
use?

It is not the responsibility of the ExecNode to decide this
but
the
operator. Something like:

class X extends ProcessFunction {
         ValueState<A> oldStateLayout;
         ValueState<B> newStateLayout;

         open() {
           if (oldStateLayout.get() != null) {
             performOperatorMigration();
           }
           useNewStateLayout();
         }
}

Operator migration is meant for smaller "more local" changes
without
touching the ExecNode layer. The CEP library and DataStream
API
sources
are performing operator migration for years already.


       > `supportedPlanChanges ` and
`supportedSavepointChanges `
are
a bit
obscure.

Let me try to come up with more examples why I think both
annotation
make sense and are esp. important *for test coverage*.

supportedPlanChanges:

Let's assume we have some JSON in Flink 1.15:

{
         some-prop: 42
}

And we want to extend the JSON in Flink 1.16:

{
         some-prop: 42,
         some-flag: false
}

Maybe we don't need to increase the ExecNode version but
only
ensure
that the flag is set to `false` by default for the older
versions.

We need a location to track changes and document the
changelog.
With
the
help of the annotation supportedPlanChanges = [1.15, 1.16]
we
can
verify
that we have tests for both JSON formats.

And once we decide to drop the 1.15 format, we enforce plan
migration
and fill-in the default value `false` into the old plans and
bump
their
JSON plan version to 1.16 or higher.



       > once the state layout is changed, the ExecNode
version
needs
also
be
updated

This will still be the majority of cases. But if we can
avoid
this, we
should do it for not having too much duplicate code to
maintain.



Thanks,
Timo


On 06.12.21 09:58, godfrey he wrote:
Hi, Timo,

Thanks for the detailed explanation.

We change an operator state of B in Flink 1.16. We
perform the
change
in the operator of B in a way to support both state layouts.
Thus,
no
need
for a new ExecNode version.

I think this design makes thing more complex.
1. If there are multiple state layouts, which layout the
ExecNode
should use ?
It increases the cost of understanding for developers
(especially
for
Flink newer),
making them prone to mistakes.
2. `supportedPlanChanges ` and `supportedSavepointChanges `
are a
bit
obscure.

The purpose of ExecNode annotations are not only to support
powerful
validation,
but more importantly to make it easy for developers to
understand
to ensure that every modification is easy and state
compatible.

I prefer, once the state layout is changed, the ExecNode
version
needs
also be updated.
which could make thing simple. How about
rename `supportedPlanChanges ` to `planCompatibleVersion`
(which means the plan is compatible with the plan
generated by
the
given version node)
        and rename `supportedSavepointChanges` to
`savepointCompatibleVersion
`
(which means the state is compatible with the state
generated
by
the
given version node) ?
The names also indicate that only one version value can be
set.

WDYT?

Best,
Godfrey









Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道:

Response to Marios's feedback:

        > there should be some good logging in place when
the
upgrade is
taking
place

Yes, I agree. I added this part to the FLIP.

        > config option instead that doesn't provide the
flexibility
to
overwrite certain plans

One can set the config option also around sections of the
multi-statement SQL script.

SET 'table.plan.force-recompile'='true';

COMPILE ...

SET 'table.plan.force-recompile'='false';

But the question is why a user wants to run COMPILE
multiple
times.
If
it is during development, then running EXECUTE (or just
the
statement
itself) without calling COMPILE should be sufficient. The
file
can
also
manually be deleted if necessary.

What do you think?

Regards,
Timo



On 02.12.21 16:09, Timo Walther wrote:
Hi Till,

Yes, you might have to. But not a new plan from the SQL
query
but a
migration from the old plan to the new plan. This will
not
happen
often.
But we need a way to evolve the format of the JSON plan
itself.

Maybe this confuses a bit, so let me clarify it again:
Mostly
ExecNode
versions and operator state layouts will evolve. Not the
plan
files,
those will be pretty stable. But also not infinitely.

Regards,
Timo


On 02.12.21 16:01, Till Rohrmann wrote:
Then for migrating from Flink 1.10 to 1.12, I might
have to
create
a
new
plan using Flink 1.11 in order to migrate from Flink
1.11 to
1.12,
right?

Cheers,
Till

On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <
twal...@apache.org>
wrote:

Response to Till's feedback:

         > compiled plan won't be changed after being
written
initially

This is not entirely correct. We give guarantees for
keeping
the
query
up and running. We reserve us the right to force plan
migrations.
In
this case, the plan might not be created from the SQL
statement
but
from
the old plan. I have added an example in section
10.1.1. In
general,
both persisted entities "plan" and "savepoint" can
evolve
independently
from each other.

Thanks,
Timo

On 02.12.21 15:10, Timo Walther wrote:
Response to Godfrey's feedback:

         > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN
...
END"
is
missing.

Thanks for the hint. I added a dedicated section
7.1.3.


         > it's hard to maintain the supported
versions for
"supportedPlanChanges" and "supportedSavepointChanges"

Actually, I think we are mostly on the same page.

The annotation does not need to be updated for every
Flink
version. As
the name suggests it is about "Changes" (in other
words:
incompatibilities) that require some kind of
migration.
Either
plan
migration (= PlanChanges) or savepoint migration
(=SavepointChanges,
using operator migration or savepoint migration).

Let's assume we introduced two ExecNodes A and B in
Flink
1.15.

The annotations are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

We change an operator state of B in Flink 1.16.

We perform the change in the operator of B in a way to
support
both
state layouts. Thus, no need for a new ExecNode
version.

The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15, 1.16)

So the versions in the annotations are "start
version"s.

I don't think we need end versions? End version would
mean
that
we
drop
the ExecNode from the code base?

Please check the section 10.1.1 again. I added a more
complex
example.


Thanks,
Timo



On 01.12.21 16:29, Timo Walther wrote:
Response to Francesco's feedback:

         > *Proposed changes #6*: Other than defining
this
rule
of
thumb, we
must also make sure that compiling plans with these
objects
that
cannot be serialized in the plan must fail hard

Yes, I totally agree. We will fail hard with a
helpful
exception.
Any
mistake e.g. using a inline object in Table API or an
invalid
DataStream API source without uid should immediately
fail a
plan
compilation step. I added a remark to the FLIP again.

         > What worries me is breaking changes, in
particular
behavioural
changes that might happen in connectors/formats

Breaking changes in connectors and formats need to be
encoded in
the
options. I could also imagine to versioning in the
factory
identifier
`connector=kafka` and `connector=kafka-2`. If this is
necessary.

After thinking about your question again, I think we
will
also
need
the same testing infrastructure for our connectors
and
formats.
Esp.
restore tests and completeness test. I updated the
document
accordingly. Also I added a way to generate UIDs for
DataStream
API
providers.

         > *Functions:* Are we talking about the
function
name
or the
function
complete signature?

For catalog functions, the identifier contains
catalog
name
and
database name. For system functions, identifier
contains
only a
name
which make function name and identifier identical. I
reworked
the
section again and also fixed some of the naming
conflicts
you
mentioned.

         > we should perhaps use a logically defined
unique id
like
/bigIntToTimestamp/

I added a concrete example for the resolution and
restoration.
The
unique id is composed of name + version. Internally,
this
is
represented as `$TO_TIMESTAMP_LTZ$1`.

         > I think we should rather keep JSON out of
the
concept

Sounds ok to me. In SQL we also just call it "plan".
I
will
change the
file sections. But would suggest to keep the
fromJsonString
method.

         > write it back in the original plan file

I updated the terminology section for what we
consider an
"upgrade".
We might need to update the orginal plan file. This
is
already
considered in the COMPILE PLAN ... FROM ... even
though
this is
future
work. Also savepoint migration.

Thanks for all the feedback!

Timo


On 30.11.21 14:28, Timo Walther wrote:
Response to Wenlongs's feedback:

         > I would prefer not to provide such a
shortcut,
let
users
use
COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly,
which
can be
understood by new users even without inferring the
docs.

I would like to hear more opinions on this topic.
Personally, I
find
a combined statement very useful. Not only for
quicker
development
and debugging but also for readability. It helps in
keeping the
JSON
path and the query close to each other in order to
know
the
origin of
the plan.

         > but the plan and SQL are not matched. The
result
would be
quite
confusing if we still execute the plan directly, we
may
need to
add a
validation.

You are right that there could be a mismatch. But we
have
a
similar
problem when executing CREATE TABLE IF NOT EXISTS.
The
schema
or
options of a table could have changed completely in
the
catalog
but
the CREATE TABLE IF NOT EXISTS is not executed
again.
So a
mismatch
could also occur there.

Regards,
Timo

On 30.11.21 14:17, Timo Walther wrote:
Hi everyone,

thanks for the feedback so far. Let me answer each
email
indvidually.

I will start with a response to Ingo's feedback:

         > Will the JSON plan's schema be
considered an
API?

No, not in the first version. This is explicitly
mentioned in
the
`General JSON Plan Assumptions`. I tried to
improve the
section
once
more to make it clearer. However, the JSON plan is
definitely
stable
per minor version. And since the plan is versioned
by
Flink
version,
external tooling could be build around it. We might
make
it
public
API once the design has settled.

         > Given that upgrades across multiple
versions
at
once are
unsupported, do we verify this somehow?

Good question. I extended the `General JSON Plan
Assumptions`.
Now
yes: the Flink version is part of the JSON plan and
will
be
verified
during restore. But keep in mind that we might
support
more
that
just the last version at least until the JSON plan
has
been
migrated.

Regards,
Timo

On 30.11.21 09:39, Marios Trivyzas wrote:
I have a question regarding the `COMPILE PLAN
OVEWRITE`.
If
we
choose to go
with the config option instead,
that doesn't provide the flexibility to overwrite
certain
plans but
not
others, since the config applies globally, isn't
that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <
mat...@gmail.com>
wrote:

Hi Timo!

Thanks a lot for taking all that time and effort
to
put
together
this
proposal!

Regarding:
For simplification of the design, we assume that
upgrades
use a
step size
of a single
minor version. We don't guarantee skipping minor
versions
(e.g.
1.11 to
1.14).

I think that for this first step we should make
it
absolutely
clear to the
users that they would need to go through all
intermediate
versions
to end up with the target version they wish. If
we
are
to
support
skipping
versions in the future, i.e. upgrade from 1.14 to
1.17,
this
means
that we need to have a testing infrastructure in
place
that
would
test all
possible combinations of version upgrades, i.e.
from
1.14 to
1.15,
from 1.14 to 1.16 and so forth, while still
testing
and
of
course
supporting all the upgrades from the previous
minor
version.

I like a lot the idea of introducing HINTS to
define
some
behaviour in the
programs!
- the hints live together with the sql
statements and
consequently
the
(JSON) plans.
- If multiple queries are involved in a program,
each
one of
them
can
define its own config (regarding plan
optimisation,
not
null
enforcement,
etc)

I agree with Francesco on his argument regarding
the
*JSON*
plan. I
believe we should already provide flexibility
here,
since
(who
knows) in
the future
a JSON plan might not fulfil the desired
functionality.

I also agree that we need some very obvious way
(i.e.
not
log
entry) to
show the users that their program doesn't support
version
upgrades, and
prevent them from being negatively surprised in
the
future,
when
trying to
upgrade their production pipelines.

This is an implementation detail, but I'd like
to add
that
there
should be
some good logging in place when the upgrade is
taking
place,
to be
able to track every restoration action, and help
debug
any
potential
issues arising from that.





On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann
<trohrm...@apache.org

wrote:

Thanks for writing this FLIP Timo. I think this
will
be a
very
important
improvement for Flink and our SQL user :-)

Similar to Francesco I would like to understand
the
statement

For simplification of the design, we assume
that
upgrades
use a
step
size
of a single
minor version. We don't guarantee skipping minor
versions
(e.g.
1.11 to
1.14).

a bit better. Is it because Flink does not
guarantee
that a
savepoint
created by version 1.x can be directly
recovered by
version
1.y
with x + 1
< y but users might have to go through a
cascade of
upgrades?
        From how I
understand your proposal, the compiled plan
won't be
changed
after being
written initially. Hence, I would assume that
for
the
plan
alone
Flink
will
have to give backwards compatibility guarantees
for
all
versions.
Am I
understanding this part correctly?

Cheers,
Till

On Thu, Nov 25, 2021 at 4:55 PM Francesco
Guardiani
<
france...@ververica.com>
wrote:

Hi Timo,

Thanks for putting this amazing work together,
I
have
some
considerations/questions
about the FLIP:
*Proposed changes #6*: Other than defining this
rule
of
thumb,
we must
also make sure
that compiling plans with these objects that
cannot
be
serialized in the
plan must fail hard,
so users don't bite themselves with such
issues, or
at
least we
need to
output warning
logs. In general, whenever the user is trying
to
use
the
CompiledPlan
APIs
and at the same
time, they're trying to do something "illegal"
for
the
plan, we
should
immediately either
log or fail depending on the issue, in order to
avoid
any
surprises once
the user upgrades.
I would also say the same for things like
registering
a
function,
registering a DataStream,
and for every other thing which won't end up
in the
plan,
we
should log
such info to the
user by default.

*General JSON Plan Assumptions #9:* When
thinking
to
connectors
and
formats, I think
it's reasonable to assume and keep out of the
feature
design
that no
feature/ability can
deleted from a connector/format. I also don't
think
new
features/abilities
can influence
this FLIP as well, given the plan is static,
so if
for
example,
MyCoolTableSink in the next
flink version implements
SupportsProjectionsPushDown,
then
it
shouldn't
be
a problem
for the upgrade story since the plan is still
configured
as
computed
from
the previous flink
version. What worries me is breaking changes,
in
particular
behavioural
changes that
might happen in connectors/formats. Although
this
argument
doesn't seem
relevant for
the connectors shipped by the flink project
itself,
because
we
try to
keep
them as stable as
possible and avoid eventual breaking changes,
it's
compelling to
external
connectors and
formats, which might be decoupled from the
flink
release
cycle
and might
have different
backward compatibility guarantees. It's totally
reasonable
if we
don't
want to tackle it in
this first iteration of the feature, but it's
something we
need
to keep
in
mind for the future.


*Functions:* It's not clear to me what you
mean for
"identifier",
because
then somewhere
else in the same context you talk about
"name". Are
we
talking
about the
function name
or the function complete signature? Let's
assume
for
example we
have
these
function
definitions:


* TO_TIMESTAMP_LTZ(BIGINT)
* TO_TIMESTAMP_LTZ(STRING)
* TO_TIMESTAMP_LTZ(STRING, STRING)

These for me are very different functions with
different
implementations,
where each of
them might evolve separately at a different
pace.
Hence
when we
store
them
in the json
plan we should perhaps use a logically defined
unique
id
like
/bigIntToTimestamp/, /
stringToTimestamp/ and
/stringToTimestampWithFormat/.
This
also
solves
the
issue of
correctly referencing the functions when
restoring
the
plan,
without
running again the
inference logic (which might have been changed
in
the
meantime)
and it
might also solve
the versioning, that is the function
identifier can
contain
the
function
version like /
stringToTimestampWithFormat_1_1 /or
/stringToTimestampWithFormat_1_2/.
An
alternative could be to use the string
signature
representation,
which
might not be trivial
to compute, given the complexity of our type
inference
logic.

*The term "JSON plan"*: I think we should
rather
keep
JSON
out
of the
concept and just
name it "Compiled Plan" (like the proposed
API) or
something
similar,
as I
see how in
future we might decide to support/modify our
persistence
format to
something more
efficient storage wise like BSON. For example,
I
would
rename /
CompiledPlan.fromJsonFile/ to simply
/CompiledPlan.fromFile/.

*Who is the owner of the plan file?* I asked
myself
this
question when
reading this:


For simplification of the design, we assume
that
upgrades
use a
step
size of a single
minor version. We don't guarantee skipping
minor
versions
(e.g.
1.11 to
1.14).

My understanding of this statement is that a
user
can
upgrade
between
minors but then
following all the minors, the same query can
remain
up and
running.
E.g. I
upgrade from
1.15 to 1.16, and then from 1.16 to 1.17 and I
still
expect
my
original
query to work
without recomputing the plan. This necessarily
means
that
at
some point
in
future
releases we'll need some basic "migration"
tool to
keep
the
queries up
and
running,
ending up modifying the compiled plan. So I
guess
flink
should
write it
back in the original
plan file, perhaps doing a backup of the
previous
one? Can
you
please
clarify this aspect?

Except these considerations, the proposal looks
good
to me
and I'm
eagerly
waiting to see
it in play.

Thanks,
FG


--
Francesco Guardiani | Software Engineer
france...@ververica.com[1]

Follow us @VervericaData
--
Join Flink Forward[2] - The Apache Flink
Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115
Berlin,
Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB
158244
B
Managing Directors: Karl Anton Wehner, Holger
Temme,
Yip
Park
Tung
Jason,
Jinwei (Kevin)
Zhang

--------
[1] mailto:france...@ververica.com
[2] https://flink-forward.org/




--
Marios

































Reply via email to