Hi Godfrey,

> design makes thing more complex.

Yes, the design might be a bit more complex. But operator migration is way easier than ExecNode migration at a later point in time for code maintenance. We know that ExecNodes can become pretty complex. Even though we have put a lot of code into `CommonXXExecNode` it will be a lot of work to maintain multiple versions of ExecNodes. If we can avoid this with operator state migration, this should always be preferred over a new ExecNode version.

I'm aware that operator state migration might only be important for roughly 10 % of all changes. A new ExecNode version will be used for 90% of all changes.

> If there are multiple state layouts, which layout the ExecNode should use?

It is not the responsibility of the ExecNode to decide this but the operator. Something like:

class X extends ProcessFunction {
  ValueState<A> oldStateLayout;
  ValueState<B> newStateLayout;

  open() {
    if (oldStateLayout.get() != null) {
      performOperatorMigration();
    }
    useNewStateLayout();
  }
}

Operator migration is meant for smaller "more local" changes without touching the ExecNode layer. The CEP library and DataStream API sources are performing operator migration for years already.


> `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit obscure.

Let me try to come up with more examples why I think both annotation make sense and are esp. important *for test coverage*.

supportedPlanChanges:

Let's assume we have some JSON in Flink 1.15:

{
  some-prop: 42
}

And we want to extend the JSON in Flink 1.16:

{
  some-prop: 42,
  some-flag: false
}

Maybe we don't need to increase the ExecNode version but only ensure that the flag is set to `false` by default for the older versions.

We need a location to track changes and document the changelog. With the help of the annotation supportedPlanChanges = [1.15, 1.16] we can verify that we have tests for both JSON formats.

And once we decide to drop the 1.15 format, we enforce plan migration and fill-in the default value `false` into the old plans and bump their JSON plan version to 1.16 or higher.



> once the state layout is changed, the ExecNode version needs also be updated

This will still be the majority of cases. But if we can avoid this, we should do it for not having too much duplicate code to maintain.



Thanks,
Timo


On 06.12.21 09:58, godfrey he wrote:
Hi, Timo,

Thanks for the detailed explanation.

We change an operator state of B in Flink 1.16. We perform the change in the 
operator of B in a way to support both state layouts. Thus, no need for a new 
ExecNode version.

I think this design makes thing more complex.
1. If there are multiple state layouts, which layout the ExecNode should use ?
It increases the cost of understanding for developers (especially for
Flink newer),
making them prone to mistakes.
2. `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit obscure.

The purpose of ExecNode annotations are not only to support powerful validation,
but more importantly to make it easy for developers to understand
to ensure that every modification is easy and state compatible.

I prefer, once the state layout is changed, the ExecNode version needs
also be updated.
which could make thing simple. How about
rename `supportedPlanChanges ` to `planCompatibleVersion`
(which means the plan is compatible with the plan generated by the
given version node)
  and rename `supportedSavepointChanges` to `savepointCompatibleVersion `
(which means the state is compatible with the state generated by the
given version node) ?
The names also indicate that only one version value can be set.

WDYT?

Best,
Godfrey









Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道:

Response to Marios's feedback:

  > there should be some good logging in place when the upgrade is taking
place

Yes, I agree. I added this part to the FLIP.

  > config option instead that doesn't provide the flexibility to
overwrite certain plans

One can set the config option also around sections of the
multi-statement SQL script.

SET 'table.plan.force-recompile'='true';

COMPILE ...

SET 'table.plan.force-recompile'='false';

But the question is why a user wants to run COMPILE multiple times. If
it is during development, then running EXECUTE (or just the statement
itself) without calling COMPILE should be sufficient. The file can also
manually be deleted if necessary.

What do you think?

Regards,
Timo



On 02.12.21 16:09, Timo Walther wrote:
Hi Till,

Yes, you might have to. But not a new plan from the SQL query but a
migration from the old plan to the new plan. This will not happen often.
But we need a way to evolve the format of the JSON plan itself.

Maybe this confuses a bit, so let me clarify it again: Mostly ExecNode
versions and operator state layouts will evolve. Not the plan files,
those will be pretty stable. But also not infinitely.

Regards,
Timo


On 02.12.21 16:01, Till Rohrmann wrote:
Then for migrating from Flink 1.10 to 1.12, I might have to create a new
plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12, right?

Cheers,
Till

On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <twal...@apache.org> wrote:

Response to Till's feedback:

   > compiled plan won't be changed after being written initially

This is not entirely correct. We give guarantees for keeping the query
up and running. We reserve us the right to force plan migrations. In
this case, the plan might not be created from the SQL statement but from
the old plan. I have added an example in section 10.1.1. In general,
both persisted entities "plan" and "savepoint" can evolve independently
from each other.

Thanks,
Timo

On 02.12.21 15:10, Timo Walther wrote:
Response to Godfrey's feedback:

   > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing.

Thanks for the hint. I added a dedicated section 7.1.3.


   > it's hard to maintain the supported versions for
"supportedPlanChanges" and "supportedSavepointChanges"

Actually, I think we are mostly on the same page.

The annotation does not need to be updated for every Flink version. As
the name suggests it is about "Changes" (in other words:
incompatibilities) that require some kind of migration. Either plan
migration (= PlanChanges) or savepoint migration (=SavepointChanges,
using operator migration or savepoint migration).

Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The annotations are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

We change an operator state of B in Flink 1.16.

We perform the change in the operator of B in a way to support both
state layouts. Thus, no need for a new ExecNode version.

The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15, 1.16)

So the versions in the annotations are "start version"s.

I don't think we need end versions? End version would mean that we drop
the ExecNode from the code base?

Please check the section 10.1.1 again. I added a more complex example.


Thanks,
Timo



On 01.12.21 16:29, Timo Walther wrote:
Response to Francesco's feedback:

   > *Proposed changes #6*: Other than defining this rule of thumb, we
must also make sure that compiling plans with these objects that
cannot be serialized in the plan must fail hard

Yes, I totally agree. We will fail hard with a helpful exception. Any
mistake e.g. using a inline object in Table API or an invalid
DataStream API source without uid should immediately fail a plan
compilation step. I added a remark to the FLIP again.

   > What worries me is breaking changes, in particular behavioural
changes that might happen in connectors/formats

Breaking changes in connectors and formats need to be encoded in the
options. I could also imagine to versioning in the factory identifier
`connector=kafka` and `connector=kafka-2`. If this is necessary.

After thinking about your question again, I think we will also need
the same testing infrastructure for our connectors and formats. Esp.
restore tests and completeness test. I updated the document
accordingly. Also I added a way to generate UIDs for DataStream API
providers.

   > *Functions:* Are we talking about the function name or the
function
complete signature?

For catalog functions, the identifier contains catalog name and
database name. For system functions, identifier contains only a name
which make function name and identifier identical. I reworked the
section again and also fixed some of the naming conflicts you
mentioned.

   > we should perhaps use a logically defined unique id like
/bigIntToTimestamp/

I added a concrete example for the resolution and restoration. The
unique id is composed of name + version. Internally, this is
represented as `$TO_TIMESTAMP_LTZ$1`.

   > I think we should rather keep JSON out of the concept

Sounds ok to me. In SQL we also just call it "plan". I will change the
file sections. But would suggest to keep the fromJsonString method.

   > write it back in the original plan file

I updated the terminology section for what we consider an "upgrade".
We might need to update the orginal plan file. This is already
considered in the COMPILE PLAN ... FROM ... even though this is future
work. Also savepoint migration.

Thanks for all the feedback!

Timo


On 30.11.21 14:28, Timo Walther wrote:
Response to Wenlongs's feedback:

   > I would prefer not to provide such a shortcut, let users use
COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be
understood by new users even without inferring the docs.

I would like to hear more opinions on this topic. Personally, I find
a combined statement very useful. Not only for quicker development
and debugging but also for readability. It helps in keeping the JSON
path and the query close to each other in order to know the origin of
the plan.

   > but the plan and SQL are not matched. The result would be quite
confusing if we still execute the plan directly, we may need to add a
validation.

You are right that there could be a mismatch. But we have a similar
problem when executing CREATE TABLE IF NOT EXISTS. The schema or
options of a table could have changed completely in the catalog but
the CREATE TABLE IF NOT EXISTS is not executed again. So a mismatch
could also occur there.

Regards,
Timo

On 30.11.21 14:17, Timo Walther wrote:
Hi everyone,

thanks for the feedback so far. Let me answer each email
indvidually.

I will start with a response to Ingo's feedback:

   > Will the JSON plan's schema be considered an API?

No, not in the first version. This is explicitly mentioned in the
`General JSON Plan Assumptions`. I tried to improve the section once
more to make it clearer. However, the JSON plan is definitely stable
per minor version. And since the plan is versioned by Flink version,
external tooling could be build around it. We might make it public
API once the design has settled.

   > Given that upgrades across multiple versions at once are
unsupported, do we verify this somehow?

Good question. I extended the `General JSON Plan Assumptions`. Now
yes: the Flink version is part of the JSON plan and will be verified
during restore. But keep in mind that we might support more that
just the last version at least until the JSON plan has been
migrated.

Regards,
Timo

On 30.11.21 09:39, Marios Trivyzas wrote:
I have a question regarding the `COMPILE PLAN OVEWRITE`. If we
choose to go
with the config option instead,
that doesn't provide the flexibility to overwrite certain plans but
not
others, since the config applies globally, isn't that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <mat...@gmail.com>
wrote:

Hi Timo!

Thanks a lot for taking all that time and effort to put together
this
proposal!

Regarding:
For simplification of the design, we assume that upgrades use a
step size
of a single
minor version. We don't guarantee skipping minor versions (e.g.
1.11 to
1.14).

I think that for this first step we should make it absolutely
clear to the
users that they would need to go through all intermediate versions
to end up with the target version they wish. If we are to support
skipping
versions in the future, i.e. upgrade from 1.14 to 1.17, this means
that we need to have a testing infrastructure in place that would
test all
possible combinations of version upgrades, i.e. from 1.14 to 1.15,
from 1.14 to 1.16 and so forth, while still testing and of course
supporting all the upgrades from the previous minor version.

I like a lot the idea of introducing HINTS to define some
behaviour in the
programs!
- the hints live together with the sql statements and consequently
the
(JSON) plans.
- If multiple queries are involved in a program, each one of them
can
define its own config (regarding plan optimisation, not null
enforcement,
etc)

I agree with Francesco on his argument regarding the *JSON*
plan. I
believe we should already provide flexibility here, since (who
knows) in
the future
a JSON plan might not fulfil the desired functionality.

I also agree that we need some very obvious way (i.e. not log
entry) to
show the users that their program doesn't support version
upgrades, and
prevent them from being negatively surprised in the future, when
trying to
upgrade their production pipelines.

This is an implementation detail, but I'd like to add that there
should be
some good logging in place when the upgrade is taking place, to be
able to track every restoration action, and help debug any
potential
issues arising from that.





On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann
<trohrm...@apache.org

wrote:

Thanks for writing this FLIP Timo. I think this will be a very
important
improvement for Flink and our SQL user :-)

Similar to Francesco I would like to understand the statement

For simplification of the design, we assume that upgrades use a
step
size
of a single
minor version. We don't guarantee skipping minor versions (e.g.
1.11 to
1.14).

a bit better. Is it because Flink does not guarantee that a
savepoint
created by version 1.x can be directly recovered by version 1.y
with x + 1
< y but users might have to go through a cascade of upgrades?
  From how I
understand your proposal, the compiled plan won't be changed
after being
written initially. Hence, I would assume that for the plan alone
Flink
will
have to give backwards compatibility guarantees for all versions.
Am I
understanding this part correctly?

Cheers,
Till

On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
france...@ververica.com>
wrote:

Hi Timo,

Thanks for putting this amazing work together, I have some
considerations/questions
about the FLIP:
*Proposed changes #6*: Other than defining this rule of thumb,
we must
also make sure
that compiling plans with these objects that cannot be
serialized in the
plan must fail hard,
so users don't bite themselves with such issues, or at least we
need to
output warning
logs. In general, whenever the user is trying to use the
CompiledPlan
APIs
and at the same
time, they're trying to do something "illegal" for the plan, we
should
immediately either
log or fail depending on the issue, in order to avoid any
surprises once
the user upgrades.
I would also say the same for things like registering a
function,
registering a DataStream,
and for every other thing which won't end up in the plan, we
should log
such info to the
user by default.

*General JSON Plan Assumptions #9:* When thinking to connectors
and
formats, I think
it's reasonable to assume and keep out of the feature design
that no
feature/ability can
deleted from a connector/format. I also don't think new
features/abilities
can influence
this FLIP as well, given the plan is static, so if for example,
MyCoolTableSink in the next
flink version implements SupportsProjectionsPushDown, then it
shouldn't
be
a problem
for the upgrade story since the plan is still configured as
computed
from
the previous flink
version. What worries me is breaking changes, in particular
behavioural
changes that
might happen in connectors/formats. Although this argument
doesn't seem
relevant for
the connectors shipped by the flink project itself, because we
try to
keep
them as stable as
possible and avoid eventual breaking changes, it's compelling to
external
connectors and
formats, which might be decoupled from the flink release cycle
and might
have different
backward compatibility guarantees. It's totally reasonable if we
don't
want to tackle it in
this first iteration of the feature, but it's something we need
to keep
in
mind for the future.


*Functions:* It's not clear to me what you mean for
"identifier",
because
then somewhere
else in the same context you talk about "name". Are we talking
about the
function name
or the function complete signature? Let's assume for example we
have
these
function
definitions:


* TO_TIMESTAMP_LTZ(BIGINT)
* TO_TIMESTAMP_LTZ(STRING)
* TO_TIMESTAMP_LTZ(STRING, STRING)

These for me are very different functions with different
implementations,
where each of
them might evolve separately at a different pace. Hence when we
store
them
in the json
plan we should perhaps use a logically defined unique id like
/bigIntToTimestamp/, /
stringToTimestamp/ and /stringToTimestampWithFormat/. This also
solves
the
issue of
correctly referencing the functions when restoring the plan,
without
running again the
inference logic (which might have been changed in the meantime)
and it
might also solve
the versioning, that is the function identifier can contain the
function
version like /
stringToTimestampWithFormat_1_1 /or
/stringToTimestampWithFormat_1_2/.
An
alternative could be to use the string signature representation,
which
might not be trivial
to compute, given the complexity of our type inference logic.

*The term "JSON plan"*: I think we should rather keep JSON out
of the
concept and just
name it "Compiled Plan" (like the proposed API) or something
similar,
as I
see how in
future we might decide to support/modify our persistence
format to
something more
efficient storage wise like BSON. For example, I would rename /
CompiledPlan.fromJsonFile/ to simply /CompiledPlan.fromFile/.

*Who is the owner of the plan file?* I asked myself this
question when
reading this:


For simplification of the design, we assume that upgrades use a
step
size of a single
minor version. We don't guarantee skipping minor versions (e.g.
1.11 to
1.14).

My understanding of this statement is that a user can upgrade
between
minors but then
following all the minors, the same query can remain up and
running.
E.g. I
upgrade from
1.15 to 1.16, and then from 1.16 to 1.17 and I still expect my
original
query to work
without recomputing the plan. This necessarily means that at
some point
in
future
releases we'll need some basic "migration" tool to keep the
queries up
and
running,
ending up modifying the compiled plan. So I guess flink should
write it
back in the original
plan file, perhaps doing a backup of the previous one? Can you
please
clarify this aspect?

Except these considerations, the proposal looks good to me
and I'm
eagerly
waiting to see
it in play.

Thanks,
FG


--
Francesco Guardiani | Software Engineer
france...@ververica.com[1]

Follow us @VervericaData
--
Join Flink Forward[2] - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park
Tung
Jason,
Jinwei (Kevin)
Zhang

--------
[1] mailto:france...@ververica.com
[2] https://flink-forward.org/




--
Marios














Reply via email to