Hi Timo,

Thanks for the explanation, it's much clearer now.

One thing I want to confirm about `supportedPlanFormat `
and `supportedSavepointFormat `:
`supportedPlanFormat ` supports multiple versions,
while `supportedSavepointFormat ` supports only one version ?
A json plan  can be deserialized by multiple versions
because default value will be set for new fields.
In theory, a Savepoint can be restored by more than one version
of the operators even if a state layout is changed,
such as deleting a whole state and starting job with
`allowNonRestoredState`=true.
I think this is a corner case, and it's hard to understand comparing
to `supportedPlanFormat ` supporting multiple versions.
So, for most cases, when the state layout is changed, the savepoint is
incompatible,
and `supportedSavepointFormat` and version need to be changed.

I think we need a detail explanation about the annotations change story in
the java doc of  `ExecNodeMetadata` class for all developers
(esp. those unfamiliar with this part).

Best,
Godfrey

Timo Walther <twal...@apache.org> 于2021年12月8日周三 下午4:57写道:
>
> Hi Wenlong,
>
> thanks for the feedback. Great that we reached consensus here. I will
> update the entire document with my previous example shortly.
>
>  > if we don't update the version when plan format changes, we can't
> find that the plan can't not be deserialized in 1.15
>
> This should not be a problem as the entire plan file has a version as
> well. We should not allow reading a 1.16 plan in 1.15. We can throw a
> helpful exception early.
>
> Reading a 1.15 plan in 1.16 is possible until we drop the old
> `supportedPlanFormat` from one of used ExecNodes. Afterwards all
> `supportedPlanFormat` of ExecNodes must be equal or higher then the plan
> version.
>
> Regards,
> Timo
>
> On 08.12.21 03:07, wenlong.lwl wrote:
> > Hi, Timo,  +1 for multi metadata.
> >
> > The compatible change I mean in the last email is the slight state change
> > example you gave, so we have got  consensus on this actually, IMO.
> >
> > Another question based on the example you gave:
> > In the example "JSON node gets an additional property in 1.16", if we don't
> > update the version when plan format changes, we can't find that the plan
> > can't not be deserialized in 1.15, although the savepoint state is
> > compatible.
> > The error message may be not so friendly if we just throw deserialization
> > failure.
> >
> > On Tue, 7 Dec 2021 at 16:49, Timo Walther <twal...@apache.org> wrote:
> >
> >> Hi Wenlong,
> >>
> >>   > First,  we add a newStateLayout because of some improvement in state, 
> >> in
> >>   > order to keep compatibility we may still keep the old state for the
> >> first
> >>   > version. We need to update the version, so that we can generate a new
> >>   > version plan for the new job and keep the exec node compatible with
> >> the old
> >>   > version plan.
> >>
> >> The problem that I see here for contributors is that the actual update
> >> of a version is more complicated than just updating an integer value. It
> >> means copying a lot of ExecNode code for a change that happens locally
> >> in an operator. Let's assume multiple ExecNodes use a similar operator.
> >> Why do we need to update all ExecNode versions, if the operator itself
> >> can deal with the incompatibility. The ExecNode version is meant for
> >> topology changes or fundamental state changes.
> >>
> >> If we don't find consensus on this topic, I would at least vote for
> >> supporting multiple annotations for an ExecNode class. This way we don't
> >> need to copy code but only add two ExecNode annotations with different
> >> ExecNode versions.
> >>
> >>   > Maybe we can add support for this case :
> >>   > when an exec node is changed in 1.16, but is compatible with 1.15,
> >>   > we can use the node of 1.16 to deserialize the plan of 1.15.
> >>
> >> If the ExecNode is compatible, there is no reason to increase the
> >> ExecNode version.
> >>
> >>
> >>
> >> I tried to come up with a reworked solution to make all parties happy:
> >>
> >> 1. Let's assume the following annotations:
> >>
> >> supportedPlanFormat = [1.15]
> >>
> >> supportedSavepointFormat = 1.15
> >>
> >> we drop `added` as it is equal to `supportedSavepointFormat`
> >>
> >> 2. Multiple annotations over ExecNodes are possible:
> >>
> >> // operator state changes
> >>
> >> // initial introduction in 1.15
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.15)
> >>
> >> // state layout changed slightly in 1.16
> >> // - operator migration is possible
> >> // - operator supports state of both versions and will perform operator
> >> state migration
> >> // - new plans will get new ExecNode version
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.15)
> >> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.16)
> >>
> >> // we force a plan migration in 1.17
> >> // - we assume that all operator states have been migrated in the
> >> previous version
> >> // - we can safely replace the old version `1` with `2` and only keep
> >> the new savepoint format
> >> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.16)
> >>
> >>
> >> // plan changes
> >>
> >> // initial introduction in 1.15
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.15)
> >>
> >> // JSON node gets an additional property in 1.16
> >> // e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
> >> // - ExecNode version does not change
> >> // - ExecNode version only changes when topology or state is affected
> >> // - we support both JSON plan formats, the old and the newest one
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15, 1.16],
> >> supportedSavepointFormat=1.15)
> >>
> >> // we force a plan migration in 1.17
> >> // - now we only support 1.16 plan format
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16,
> >> supportedSavepointFormat=1.15)
> >>
> >>
> >> // topology change
> >>
> >> // initial introduction in 1.15
> >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.15)
> >>
> >> // complete new class structure in 1.16 annotated with
> >> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >> supportedSavepointFormat=1.16)
> >>
> >>
> >>
> >> What do you think?
> >>
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On 07.12.21 08:20, wenlong.lwl wrote:
> >>> Maybe we can add support for this case :
> >>>           when an exec node is changed in 1.16, but is compatible with
> >> 1.15,
> >>> we can use the node of 1.16 to deserialize the plan of 1.15.
> >>> By this way, we don't need to fork the code if the change is compatible,
> >>> and can avoid fork code frequently.
> >>>
> >>>
> >>> Best,
> >>> Wenlong
> >>>
> >>>
> >>> On Tue, 7 Dec 2021 at 15:08, wenlong.lwl <wenlong88....@gmail.com>
> >> wrote:
> >>>
> >>>> hi, Timo, I would prefer to update the version every time we change the
> >>>> state layer too.
> >>>>
> >>>> It could be possible that we change the exec node in 2 steps:
> >>>> First,  we add a newStateLayout because of some improvement in state, in
> >>>> order to keep compatibility we may still keep the old state for the
> >> first
> >>>> version. We need to update the version, so that we can generate a new
> >>>> version plan for the new job and keep the exec node compatible with the
> >> old
> >>>> version plan.
> >>>> After some versions, we may remove the old version state layout and
> >> clean
> >>>> up the deprecated code. We still need to update the version, so that we
> >> can
> >>>> verify that we are compatible with the plan after the first change, but
> >> not
> >>>> compatible with the plan earlier.
> >>>>
> >>>>
> >>>> Best,
> >>>> Wenlong
> >>>>
> >>>> On Mon, 6 Dec 2021 at 21:27, Timo Walther <twal...@apache.org> wrote:
> >>>>
> >>>>> Hi Godfrey,
> >>>>>
> >>>>>    > design makes thing more complex.
> >>>>>
> >>>>> Yes, the design might be a bit more complex. But operator migration is
> >>>>> way easier than ExecNode migration at a later point in time for code
> >>>>> maintenance. We know that ExecNodes can become pretty complex. Even
> >>>>> though we have put a lot of code into `CommonXXExecNode` it will be a
> >>>>> lot of work to maintain multiple versions of ExecNodes. If we can avoid
> >>>>> this with operator state migration, this should always be preferred
> >> over
> >>>>> a new ExecNode version.
> >>>>>
> >>>>> I'm aware that operator state migration might only be important for
> >>>>> roughly 10 % of all changes. A new ExecNode version will be used for
> >> 90%
> >>>>> of all changes.
> >>>>>
> >>>>>    > If there are multiple state layouts, which layout the ExecNode
> >> should
> >>>>> use?
> >>>>>
> >>>>> It is not the responsibility of the ExecNode to decide this but the
> >>>>> operator. Something like:
> >>>>>
> >>>>> class X extends ProcessFunction {
> >>>>>      ValueState<A> oldStateLayout;
> >>>>>      ValueState<B> newStateLayout;
> >>>>>
> >>>>>      open() {
> >>>>>        if (oldStateLayout.get() != null) {
> >>>>>          performOperatorMigration();
> >>>>>        }
> >>>>>        useNewStateLayout();
> >>>>>      }
> >>>>> }
> >>>>>
> >>>>> Operator migration is meant for smaller "more local" changes without
> >>>>> touching the ExecNode layer. The CEP library and DataStream API sources
> >>>>> are performing operator migration for years already.
> >>>>>
> >>>>>
> >>>>>    > `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit
> >>>>> obscure.
> >>>>>
> >>>>> Let me try to come up with more examples why I think both annotation
> >>>>> make sense and are esp. important *for test coverage*.
> >>>>>
> >>>>> supportedPlanChanges:
> >>>>>
> >>>>> Let's assume we have some JSON in Flink 1.15:
> >>>>>
> >>>>> {
> >>>>>      some-prop: 42
> >>>>> }
> >>>>>
> >>>>> And we want to extend the JSON in Flink 1.16:
> >>>>>
> >>>>> {
> >>>>>      some-prop: 42,
> >>>>>      some-flag: false
> >>>>> }
> >>>>>
> >>>>> Maybe we don't need to increase the ExecNode version but only ensure
> >>>>> that the flag is set to `false` by default for the older versions.
> >>>>>
> >>>>> We need a location to track changes and document the changelog. With
> >> the
> >>>>> help of the annotation supportedPlanChanges = [1.15, 1.16] we can
> >> verify
> >>>>> that we have tests for both JSON formats.
> >>>>>
> >>>>> And once we decide to drop the 1.15 format, we enforce plan migration
> >>>>> and fill-in the default value `false` into the old plans and bump their
> >>>>> JSON plan version to 1.16 or higher.
> >>>>>
> >>>>>
> >>>>>
> >>>>>    > once the state layout is changed, the ExecNode version needs also
> >> be
> >>>>> updated
> >>>>>
> >>>>> This will still be the majority of cases. But if we can avoid this, we
> >>>>> should do it for not having too much duplicate code to maintain.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Thanks,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 06.12.21 09:58, godfrey he wrote:
> >>>>>> Hi, Timo,
> >>>>>>
> >>>>>> Thanks for the detailed explanation.
> >>>>>>
> >>>>>>> We change an operator state of B in Flink 1.16. We perform the change
> >>>>> in the operator of B in a way to support both state layouts. Thus, no
> >> need
> >>>>> for a new ExecNode version.
> >>>>>>
> >>>>>> I think this design makes thing more complex.
> >>>>>> 1. If there are multiple state layouts, which layout the ExecNode
> >>>>> should use ?
> >>>>>> It increases the cost of understanding for developers (especially for
> >>>>>> Flink newer),
> >>>>>> making them prone to mistakes.
> >>>>>> 2. `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit
> >>>>> obscure.
> >>>>>>
> >>>>>> The purpose of ExecNode annotations are not only to support powerful
> >>>>> validation,
> >>>>>> but more importantly to make it easy for developers to understand
> >>>>>> to ensure that every modification is easy and state compatible.
> >>>>>>
> >>>>>> I prefer, once the state layout is changed, the ExecNode version needs
> >>>>>> also be updated.
> >>>>>> which could make thing simple. How about
> >>>>>> rename `supportedPlanChanges ` to `planCompatibleVersion`
> >>>>>> (which means the plan is compatible with the plan generated by the
> >>>>>> given version node)
> >>>>>>     and rename `supportedSavepointChanges` to
> >> `savepointCompatibleVersion
> >>>>> `
> >>>>>> (which means the state is compatible with the state generated by the
> >>>>>> given version node) ?
> >>>>>> The names also indicate that only one version value can be set.
> >>>>>>
> >>>>>> WDYT?
> >>>>>>
> >>>>>> Best,
> >>>>>> Godfrey
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道:
> >>>>>>>
> >>>>>>> Response to Marios's feedback:
> >>>>>>>
> >>>>>>>     > there should be some good logging in place when the upgrade is
> >>>>> taking
> >>>>>>> place
> >>>>>>>
> >>>>>>> Yes, I agree. I added this part to the FLIP.
> >>>>>>>
> >>>>>>>     > config option instead that doesn't provide the flexibility to
> >>>>>>> overwrite certain plans
> >>>>>>>
> >>>>>>> One can set the config option also around sections of the
> >>>>>>> multi-statement SQL script.
> >>>>>>>
> >>>>>>> SET 'table.plan.force-recompile'='true';
> >>>>>>>
> >>>>>>> COMPILE ...
> >>>>>>>
> >>>>>>> SET 'table.plan.force-recompile'='false';
> >>>>>>>
> >>>>>>> But the question is why a user wants to run COMPILE multiple times.
> >> If
> >>>>>>> it is during development, then running EXECUTE (or just the statement
> >>>>>>> itself) without calling COMPILE should be sufficient. The file can
> >> also
> >>>>>>> manually be deleted if necessary.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 02.12.21 16:09, Timo Walther wrote:
> >>>>>>>> Hi Till,
> >>>>>>>>
> >>>>>>>> Yes, you might have to. But not a new plan from the SQL query but a
> >>>>>>>> migration from the old plan to the new plan. This will not happen
> >>>>> often.
> >>>>>>>> But we need a way to evolve the format of the JSON plan itself.
> >>>>>>>>
> >>>>>>>> Maybe this confuses a bit, so let me clarify it again: Mostly
> >> ExecNode
> >>>>>>>> versions and operator state layouts will evolve. Not the plan files,
> >>>>>>>> those will be pretty stable. But also not infinitely.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 02.12.21 16:01, Till Rohrmann wrote:
> >>>>>>>>> Then for migrating from Flink 1.10 to 1.12, I might have to create
> >> a
> >>>>> new
> >>>>>>>>> plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12,
> >>>>> right?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Till
> >>>>>>>>>
> >>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <twal...@apache.org>
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Response to Till's feedback:
> >>>>>>>>>>
> >>>>>>>>>>      > compiled plan won't be changed after being written initially
> >>>>>>>>>>
> >>>>>>>>>> This is not entirely correct. We give guarantees for keeping the
> >>>>> query
> >>>>>>>>>> up and running. We reserve us the right to force plan migrations.
> >> In
> >>>>>>>>>> this case, the plan might not be created from the SQL statement
> >> but
> >>>>> from
> >>>>>>>>>> the old plan. I have added an example in section 10.1.1. In
> >> general,
> >>>>>>>>>> both persisted entities "plan" and "savepoint" can evolve
> >>>>> independently
> >>>>>>>>>> from each other.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>> On 02.12.21 15:10, Timo Walther wrote:
> >>>>>>>>>>> Response to Godfrey's feedback:
> >>>>>>>>>>>
> >>>>>>>>>>>      > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is
> >> missing.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the hint. I added a dedicated section 7.1.3.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>      > it's hard to maintain the supported versions for
> >>>>>>>>>>> "supportedPlanChanges" and "supportedSavepointChanges"
> >>>>>>>>>>>
> >>>>>>>>>>> Actually, I think we are mostly on the same page.
> >>>>>>>>>>>
> >>>>>>>>>>> The annotation does not need to be updated for every Flink
> >>>>> version. As
> >>>>>>>>>>> the name suggests it is about "Changes" (in other words:
> >>>>>>>>>>> incompatibilities) that require some kind of migration. Either
> >> plan
> >>>>>>>>>>> migration (= PlanChanges) or savepoint migration
> >>>>> (=SavepointChanges,
> >>>>>>>>>>> using operator migration or savepoint migration).
> >>>>>>>>>>>
> >>>>>>>>>>> Let's assume we introduced two ExecNodes A and B in Flink 1.15.
> >>>>>>>>>>>
> >>>>>>>>>>> The annotations are:
> >>>>>>>>>>>
> >>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>
> >>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>
> >>>>>>>>>>> We change an operator state of B in Flink 1.16.
> >>>>>>>>>>>
> >>>>>>>>>>> We perform the change in the operator of B in a way to support
> >> both
> >>>>>>>>>>> state layouts. Thus, no need for a new ExecNode version.
> >>>>>>>>>>>
> >>>>>>>>>>> The annotations in 1.16 are:
> >>>>>>>>>>>
> >>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>
> >>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>>>> supportedSavepointChanges=1.15, 1.16)
> >>>>>>>>>>>
> >>>>>>>>>>> So the versions in the annotations are "start version"s.
> >>>>>>>>>>>
> >>>>>>>>>>> I don't think we need end versions? End version would mean that
> >> we
> >>>>> drop
> >>>>>>>>>>> the ExecNode from the code base?
> >>>>>>>>>>>
> >>>>>>>>>>> Please check the section 10.1.1 again. I added a more complex
> >>>>> example.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Timo
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 01.12.21 16:29, Timo Walther wrote:
> >>>>>>>>>>>> Response to Francesco's feedback:
> >>>>>>>>>>>>
> >>>>>>>>>>>>      > *Proposed changes #6*: Other than defining this rule of
> >>>>> thumb, we
> >>>>>>>>>>>> must also make sure that compiling plans with these objects that
> >>>>>>>>>>>> cannot be serialized in the plan must fail hard
> >>>>>>>>>>>>
> >>>>>>>>>>>> Yes, I totally agree. We will fail hard with a helpful
> >> exception.
> >>>>> Any
> >>>>>>>>>>>> mistake e.g. using a inline object in Table API or an invalid
> >>>>>>>>>>>> DataStream API source without uid should immediately fail a plan
> >>>>>>>>>>>> compilation step. I added a remark to the FLIP again.
> >>>>>>>>>>>>
> >>>>>>>>>>>>      > What worries me is breaking changes, in particular
> >>>>> behavioural
> >>>>>>>>>>>> changes that might happen in connectors/formats
> >>>>>>>>>>>>
> >>>>>>>>>>>> Breaking changes in connectors and formats need to be encoded in
> >>>>> the
> >>>>>>>>>>>> options. I could also imagine to versioning in the factory
> >>>>> identifier
> >>>>>>>>>>>> `connector=kafka` and `connector=kafka-2`. If this is necessary.
> >>>>>>>>>>>>
> >>>>>>>>>>>> After thinking about your question again, I think we will also
> >>>>> need
> >>>>>>>>>>>> the same testing infrastructure for our connectors and formats.
> >>>>> Esp.
> >>>>>>>>>>>> restore tests and completeness test. I updated the document
> >>>>>>>>>>>> accordingly. Also I added a way to generate UIDs for DataStream
> >>>>> API
> >>>>>>>>>>>> providers.
> >>>>>>>>>>>>
> >>>>>>>>>>>>      > *Functions:* Are we talking about the function name or the
> >>>>>>>>>>>> function
> >>>>>>>>>>>> complete signature?
> >>>>>>>>>>>>
> >>>>>>>>>>>> For catalog functions, the identifier contains catalog name and
> >>>>>>>>>>>> database name. For system functions, identifier contains only a
> >>>>> name
> >>>>>>>>>>>> which make function name and identifier identical. I reworked
> >> the
> >>>>>>>>>>>> section again and also fixed some of the naming conflicts you
> >>>>>>>>>>>> mentioned.
> >>>>>>>>>>>>
> >>>>>>>>>>>>      > we should perhaps use a logically defined unique id like
> >>>>>>>>>>>> /bigIntToTimestamp/
> >>>>>>>>>>>>
> >>>>>>>>>>>> I added a concrete example for the resolution and restoration.
> >> The
> >>>>>>>>>>>> unique id is composed of name + version. Internally, this is
> >>>>>>>>>>>> represented as `$TO_TIMESTAMP_LTZ$1`.
> >>>>>>>>>>>>
> >>>>>>>>>>>>      > I think we should rather keep JSON out of the concept
> >>>>>>>>>>>>
> >>>>>>>>>>>> Sounds ok to me. In SQL we also just call it "plan". I will
> >>>>> change the
> >>>>>>>>>>>> file sections. But would suggest to keep the fromJsonString
> >>>>> method.
> >>>>>>>>>>>>
> >>>>>>>>>>>>      > write it back in the original plan file
> >>>>>>>>>>>>
> >>>>>>>>>>>> I updated the terminology section for what we consider an
> >>>>> "upgrade".
> >>>>>>>>>>>> We might need to update the orginal plan file. This is already
> >>>>>>>>>>>> considered in the COMPILE PLAN ... FROM ... even though this is
> >>>>> future
> >>>>>>>>>>>> work. Also savepoint migration.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for all the feedback!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 30.11.21 14:28, Timo Walther wrote:
> >>>>>>>>>>>>> Response to Wenlongs's feedback:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>      > I would prefer not to provide such a shortcut, let users
> >> use
> >>>>>>>>>>>>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be
> >>>>>>>>>>>>> understood by new users even without inferring the docs.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I would like to hear more opinions on this topic. Personally, I
> >>>>> find
> >>>>>>>>>>>>> a combined statement very useful. Not only for quicker
> >>>>> development
> >>>>>>>>>>>>> and debugging but also for readability. It helps in keeping the
> >>>>> JSON
> >>>>>>>>>>>>> path and the query close to each other in order to know the
> >>>>> origin of
> >>>>>>>>>>>>> the plan.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>      > but the plan and SQL are not matched. The result would be
> >>>>> quite
> >>>>>>>>>>>>> confusing if we still execute the plan directly, we may need to
> >>>>> add a
> >>>>>>>>>>>>> validation.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> You are right that there could be a mismatch. But we have a
> >>>>> similar
> >>>>>>>>>>>>> problem when executing CREATE TABLE IF NOT EXISTS. The schema
> >> or
> >>>>>>>>>>>>> options of a table could have changed completely in the catalog
> >>>>> but
> >>>>>>>>>>>>> the CREATE TABLE IF NOT EXISTS is not executed again. So a
> >>>>> mismatch
> >>>>>>>>>>>>> could also occur there.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 30.11.21 14:17, Timo Walther wrote:
> >>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> thanks for the feedback so far. Let me answer each email
> >>>>>>>>>>>>>> indvidually.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I will start with a response to Ingo's feedback:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>      > Will the JSON plan's schema be considered an API?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> No, not in the first version. This is explicitly mentioned in
> >>>>> the
> >>>>>>>>>>>>>> `General JSON Plan Assumptions`. I tried to improve the
> >> section
> >>>>> once
> >>>>>>>>>>>>>> more to make it clearer. However, the JSON plan is definitely
> >>>>> stable
> >>>>>>>>>>>>>> per minor version. And since the plan is versioned by Flink
> >>>>> version,
> >>>>>>>>>>>>>> external tooling could be build around it. We might make it
> >>>>> public
> >>>>>>>>>>>>>> API once the design has settled.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>      > Given that upgrades across multiple versions at once are
> >>>>>>>>>>>>>> unsupported, do we verify this somehow?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Good question. I extended the `General JSON Plan Assumptions`.
> >>>>> Now
> >>>>>>>>>>>>>> yes: the Flink version is part of the JSON plan and will be
> >>>>> verified
> >>>>>>>>>>>>>> during restore. But keep in mind that we might support more
> >> that
> >>>>>>>>>>>>>> just the last version at least until the JSON plan has been
> >>>>>>>>>>>>>> migrated.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 30.11.21 09:39, Marios Trivyzas wrote:
> >>>>>>>>>>>>>>> I have a question regarding the `COMPILE PLAN OVEWRITE`. If
> >> we
> >>>>>>>>>>>>>>> choose to go
> >>>>>>>>>>>>>>> with the config option instead,
> >>>>>>>>>>>>>>> that doesn't provide the flexibility to overwrite certain
> >>>>> plans but
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>> others, since the config applies globally, isn't that
> >>>>>>>>>>>>>>> something to consider?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <
> >>>>> mat...@gmail.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Timo!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks a lot for taking all that time and effort to put
> >>>>> together
> >>>>>>>>>> this
> >>>>>>>>>>>>>>>> proposal!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regarding:
> >>>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades
> >>>>> use a
> >>>>>>>>>>>>>>>>> step size
> >>>>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions
> >>>>> (e.g.
> >>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I think that for this first step we should make it
> >> absolutely
> >>>>>>>>>>>>>>>> clear to the
> >>>>>>>>>>>>>>>> users that they would need to go through all intermediate
> >>>>> versions
> >>>>>>>>>>>>>>>> to end up with the target version they wish. If we are to
> >>>>> support
> >>>>>>>>>>>>>>>> skipping
> >>>>>>>>>>>>>>>> versions in the future, i.e. upgrade from 1.14 to 1.17, this
> >>>>> means
> >>>>>>>>>>>>>>>> that we need to have a testing infrastructure in place that
> >>>>> would
> >>>>>>>>>>>>>>>> test all
> >>>>>>>>>>>>>>>> possible combinations of version upgrades, i.e. from 1.14 to
> >>>>> 1.15,
> >>>>>>>>>>>>>>>> from 1.14 to 1.16 and so forth, while still testing and of
> >>>>> course
> >>>>>>>>>>>>>>>> supporting all the upgrades from the previous minor version.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I like a lot the idea of introducing HINTS to define some
> >>>>>>>>>>>>>>>> behaviour in the
> >>>>>>>>>>>>>>>> programs!
> >>>>>>>>>>>>>>>> - the hints live together with the sql statements and
> >>>>> consequently
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> (JSON) plans.
> >>>>>>>>>>>>>>>> - If multiple queries are involved in a program, each one of
> >>>>> them
> >>>>>>>>>> can
> >>>>>>>>>>>>>>>> define its own config (regarding plan optimisation, not null
> >>>>>>>>>>>>>>>> enforcement,
> >>>>>>>>>>>>>>>> etc)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I agree with Francesco on his argument regarding the *JSON*
> >>>>>>>>>>>>>>>> plan. I
> >>>>>>>>>>>>>>>> believe we should already provide flexibility here, since
> >> (who
> >>>>>>>>>>>>>>>> knows) in
> >>>>>>>>>>>>>>>> the future
> >>>>>>>>>>>>>>>> a JSON plan might not fulfil the desired functionality.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I also agree that we need some very obvious way (i.e. not
> >> log
> >>>>>>>>>>>>>>>> entry) to
> >>>>>>>>>>>>>>>> show the users that their program doesn't support version
> >>>>>>>>>>>>>>>> upgrades, and
> >>>>>>>>>>>>>>>> prevent them from being negatively surprised in the future,
> >>>>> when
> >>>>>>>>>>>>>>>> trying to
> >>>>>>>>>>>>>>>> upgrade their production pipelines.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This is an implementation detail, but I'd like to add that
> >>>>> there
> >>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>> some good logging in place when the upgrade is taking place,
> >>>>> to be
> >>>>>>>>>>>>>>>> able to track every restoration action, and help debug any
> >>>>>>>>>>>>>>>> potential
> >>>>>>>>>>>>>>>> issues arising from that.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann
> >>>>>>>>>>>>>>>> <trohrm...@apache.org
> >>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for writing this FLIP Timo. I think this will be a
> >>>>> very
> >>>>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>> improvement for Flink and our SQL user :-)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Similar to Francesco I would like to understand the
> >> statement
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades
> >>>>> use a
> >>>>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>>>> size
> >>>>>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions
> >>>>> (e.g.
> >>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> a bit better. Is it because Flink does not guarantee that a
> >>>>>>>>>>>>>>>>> savepoint
> >>>>>>>>>>>>>>>>> created by version 1.x can be directly recovered by version
> >>>>> 1.y
> >>>>>>>>>>>>>>>>> with x + 1
> >>>>>>>>>>>>>>>>> < y but users might have to go through a cascade of
> >> upgrades?
> >>>>>>>>>>>>>>>>>     From how I
> >>>>>>>>>>>>>>>>> understand your proposal, the compiled plan won't be
> >> changed
> >>>>>>>>>>>>>>>>> after being
> >>>>>>>>>>>>>>>>> written initially. Hence, I would assume that for the plan
> >>>>> alone
> >>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> have to give backwards compatibility guarantees for all
> >>>>> versions.
> >>>>>>>>>>>>>>>>> Am I
> >>>>>>>>>>>>>>>>> understanding this part correctly?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
> >>>>>>>>>>>>>>>>> france...@ververica.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for putting this amazing work together, I have some
> >>>>>>>>>>>>>>>>>> considerations/questions
> >>>>>>>>>>>>>>>>>> about the FLIP:
> >>>>>>>>>>>>>>>>>> *Proposed changes #6*: Other than defining this rule of
> >>>>> thumb,
> >>>>>>>>>>>>>>>>>> we must
> >>>>>>>>>>>>>>>>>> also make sure
> >>>>>>>>>>>>>>>>>> that compiling plans with these objects that cannot be
> >>>>>>>>>>>>>>>>>> serialized in the
> >>>>>>>>>>>>>>>>>> plan must fail hard,
> >>>>>>>>>>>>>>>>>> so users don't bite themselves with such issues, or at
> >>>>> least we
> >>>>>>>>>>>>>>>>>> need to
> >>>>>>>>>>>>>>>>>> output warning
> >>>>>>>>>>>>>>>>>> logs. In general, whenever the user is trying to use the
> >>>>>>>>>>>>>>>>>> CompiledPlan
> >>>>>>>>>>>>>>>>> APIs
> >>>>>>>>>>>>>>>>>> and at the same
> >>>>>>>>>>>>>>>>>> time, they're trying to do something "illegal" for the
> >>>>> plan, we
> >>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>> immediately either
> >>>>>>>>>>>>>>>>>> log or fail depending on the issue, in order to avoid any
> >>>>>>>>>>>>>>>>>> surprises once
> >>>>>>>>>>>>>>>>>> the user upgrades.
> >>>>>>>>>>>>>>>>>> I would also say the same for things like registering a
> >>>>>>>>>>>>>>>>>> function,
> >>>>>>>>>>>>>>>>>> registering a DataStream,
> >>>>>>>>>>>>>>>>>> and for every other thing which won't end up in the plan,
> >> we
> >>>>>>>>>>>>>>>>>> should log
> >>>>>>>>>>>>>>>>>> such info to the
> >>>>>>>>>>>>>>>>>> user by default.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> *General JSON Plan Assumptions #9:* When thinking to
> >>>>> connectors
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> formats, I think
> >>>>>>>>>>>>>>>>>> it's reasonable to assume and keep out of the feature
> >> design
> >>>>>>>>>>>>>>>>>> that no
> >>>>>>>>>>>>>>>>>> feature/ability can
> >>>>>>>>>>>>>>>>>> deleted from a connector/format. I also don't think new
> >>>>>>>>>>>>>>>>> features/abilities
> >>>>>>>>>>>>>>>>>> can influence
> >>>>>>>>>>>>>>>>>> this FLIP as well, given the plan is static, so if for
> >>>>> example,
> >>>>>>>>>>>>>>>>>> MyCoolTableSink in the next
> >>>>>>>>>>>>>>>>>> flink version implements SupportsProjectionsPushDown, then
> >>>>> it
> >>>>>>>>>>>>>>>>>> shouldn't
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> a problem
> >>>>>>>>>>>>>>>>>> for the upgrade story since the plan is still configured
> >> as
> >>>>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>> the previous flink
> >>>>>>>>>>>>>>>>>> version. What worries me is breaking changes, in
> >> particular
> >>>>>>>>>>>>>>>>>> behavioural
> >>>>>>>>>>>>>>>>>> changes that
> >>>>>>>>>>>>>>>>>> might happen in connectors/formats. Although this argument
> >>>>>>>>>>>>>>>>>> doesn't seem
> >>>>>>>>>>>>>>>>>> relevant for
> >>>>>>>>>>>>>>>>>> the connectors shipped by the flink project itself,
> >> because
> >>>>> we
> >>>>>>>>>>>>>>>>>> try to
> >>>>>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>>> them as stable as
> >>>>>>>>>>>>>>>>>> possible and avoid eventual breaking changes, it's
> >>>>> compelling to
> >>>>>>>>>>>>>>>>> external
> >>>>>>>>>>>>>>>>>> connectors and
> >>>>>>>>>>>>>>>>>> formats, which might be decoupled from the flink release
> >>>>> cycle
> >>>>>>>>>>>>>>>>>> and might
> >>>>>>>>>>>>>>>>>> have different
> >>>>>>>>>>>>>>>>>> backward compatibility guarantees. It's totally reasonable
> >>>>> if we
> >>>>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>> want to tackle it in
> >>>>>>>>>>>>>>>>>> this first iteration of the feature, but it's something we
> >>>>> need
> >>>>>>>>>>>>>>>>>> to keep
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> mind for the future.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> *Functions:* It's not clear to me what you mean for
> >>>>>>>>>>>>>>>>>> "identifier",
> >>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>> then somewhere
> >>>>>>>>>>>>>>>>>> else in the same context you talk about "name". Are we
> >>>>> talking
> >>>>>>>>>>>>>>>>>> about the
> >>>>>>>>>>>>>>>>>> function name
> >>>>>>>>>>>>>>>>>> or the function complete signature? Let's assume for
> >>>>> example we
> >>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>> definitions:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT)
> >>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING)
> >>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> These for me are very different functions with different
> >>>>>>>>>>>>>>>>> implementations,
> >>>>>>>>>>>>>>>>>> where each of
> >>>>>>>>>>>>>>>>>> them might evolve separately at a different pace. Hence
> >>>>> when we
> >>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>> in the json
> >>>>>>>>>>>>>>>>>> plan we should perhaps use a logically defined unique id
> >>>>> like
> >>>>>>>>>>>>>>>>>> /bigIntToTimestamp/, /
> >>>>>>>>>>>>>>>>>> stringToTimestamp/ and /stringToTimestampWithFormat/. This
> >>>>> also
> >>>>>>>>>>>>>>>>>> solves
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> issue of
> >>>>>>>>>>>>>>>>>> correctly referencing the functions when restoring the
> >> plan,
> >>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>> running again the
> >>>>>>>>>>>>>>>>>> inference logic (which might have been changed in the
> >>>>> meantime)
> >>>>>>>>>>>>>>>>>> and it
> >>>>>>>>>>>>>>>>>> might also solve
> >>>>>>>>>>>>>>>>>> the versioning, that is the function identifier can
> >> contain
> >>>>> the
> >>>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>> version like /
> >>>>>>>>>>>>>>>>>> stringToTimestampWithFormat_1_1 /or
> >>>>>>>>>>>>>>>>>> /stringToTimestampWithFormat_1_2/.
> >>>>>>>>>>>>>>>>> An
> >>>>>>>>>>>>>>>>>> alternative could be to use the string signature
> >>>>> representation,
> >>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>> might not be trivial
> >>>>>>>>>>>>>>>>>> to compute, given the complexity of our type inference
> >>>>> logic.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> *The term "JSON plan"*: I think we should rather keep JSON
> >>>>> out
> >>>>>>>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>>>>> concept and just
> >>>>>>>>>>>>>>>>>> name it "Compiled Plan" (like the proposed API) or
> >> something
> >>>>>>>>>>>>>>>>>> similar,
> >>>>>>>>>>>>>>>>> as I
> >>>>>>>>>>>>>>>>>> see how in
> >>>>>>>>>>>>>>>>>> future we might decide to support/modify our persistence
> >>>>>>>>>>>>>>>>>> format to
> >>>>>>>>>>>>>>>>>> something more
> >>>>>>>>>>>>>>>>>> efficient storage wise like BSON. For example, I would
> >>>>> rename /
> >>>>>>>>>>>>>>>>>> CompiledPlan.fromJsonFile/ to simply
> >>>>> /CompiledPlan.fromFile/.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> *Who is the owner of the plan file?* I asked myself this
> >>>>>>>>>>>>>>>>>> question when
> >>>>>>>>>>>>>>>>>> reading this:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades
> >>>>> use a
> >>>>>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>>>>> size of a single
> >>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions
> >>>>> (e.g.
> >>>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> My understanding of this statement is that a user can
> >>>>> upgrade
> >>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>> minors but then
> >>>>>>>>>>>>>>>>>> following all the minors, the same query can remain up and
> >>>>>>>>>> running.
> >>>>>>>>>>>>>>>>> E.g. I
> >>>>>>>>>>>>>>>>>> upgrade from
> >>>>>>>>>>>>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I still
> >> expect
> >>>>> my
> >>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>> query to work
> >>>>>>>>>>>>>>>>>> without recomputing the plan. This necessarily means that
> >> at
> >>>>>>>>>>>>>>>>>> some point
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> future
> >>>>>>>>>>>>>>>>>> releases we'll need some basic "migration" tool to keep
> >> the
> >>>>>>>>>>>>>>>>>> queries up
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> running,
> >>>>>>>>>>>>>>>>>> ending up modifying the compiled plan. So I guess flink
> >>>>> should
> >>>>>>>>>>>>>>>>>> write it
> >>>>>>>>>>>>>>>>>> back in the original
> >>>>>>>>>>>>>>>>>> plan file, perhaps doing a backup of the previous one? Can
> >>>>> you
> >>>>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>>> clarify this aspect?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Except these considerations, the proposal looks good to me
> >>>>>>>>>>>>>>>>>> and I'm
> >>>>>>>>>>>>>>>>> eagerly
> >>>>>>>>>>>>>>>>>> waiting to see
> >>>>>>>>>>>>>>>>>> it in play.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> FG
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Francesco Guardiani | Software Engineer
> >>>>>>>>>>>>>>>>>> france...@ververica.com[1]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Follow us @VervericaData
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Join Flink Forward[2] - The Apache Flink Conference
> >>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
> >> Germany
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Ververica GmbH
> >>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >>>>>>>>>>>>>>>>>> Managing Directors: Karl Anton Wehner, Holger Temme, Yip
> >>>>> Park
> >>>>>>>>>>>>>>>>>> Tung
> >>>>>>>>>>>>>>>>> Jason,
> >>>>>>>>>>>>>>>>>> Jinwei (Kevin)
> >>>>>>>>>>>>>>>>>> Zhang
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --------
> >>>>>>>>>>>>>>>>>> [1] mailto:france...@ververica.com
> >>>>>>>>>>>>>>>>>> [2] https://flink-forward.org/
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Marios
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >>
> >
>

Reply via email to