Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-09 Thread Timo Walther

Hi Jing and Godfrey,

I had another iteration over the document. There are two major changes:

1. Supported Flink Upgrade Versions

I got the feedback via various channels that a step size of one minor 
version is not very convenient. As you said, "because upgrading to a new

version is a time-consuming process". I rephrased this section:

Upgrading usually involves work which is why many users perform this 
task rarely (e.g. only once per year). Also skipping a versions is 
common until a new feature has been introduced for which is it worth to 
upgrade. We will support the upgrade to the most recent Flink version 
from a set of previous versions. We aim to support upgrades from the 
last 2-3 releases on a best-effort basis; maybe even more depending on 
the maintenance overhead. However, in order to not grow the testing 
matrix infinitely and to perform important refactoring if necessary, we 
only guarantee upgrades with a step size of a single minor version (i.e. 
a cascade of upgrades).


2. Annotation Design

I also adopted the multiple annotations design for the previous 
supportPlanFormat. So no array of versions anymore. I reworked the 
section, please have a look with updated examples:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI

I also got the feedback offline that `savepoint` might not be the right 
terminology for the annotation. I changed that to minPlanVersion and 
minStateVersion.


Let me know what you think.

Regards,
Timo



On 09.12.21 08:44, Jing Zhang wrote:

Hi Timo,
Thanks a lot for driving this discussion.
I believe it could solve many problems what we are suffering in upgrading.

I only have a little complain on the following point.


For simplification of the design, we assume that upgrades use a step size

of a single minor version. We don't guarantee skipping minor versions (e.g.
1.11 to
1.14).

In our internal production environment, we follow up with the community's
latest stable release version almost once a year because upgrading to a new
version is a time-consuming process.
So we might missed 1~3 version after we upgrade to the latest version. This
might also appears in other company too.
Could we guarantee FLIP-190 work if we skip minor versions less than
specified threshold?
Then we could know which version is good for us when prepare upgrading.

Best,
Jing Zhang

godfrey he  于2021年12月8日周三 22:16写道:


Hi Timo,

Thanks for the explanation, it's much clearer now.

One thing I want to confirm about `supportedPlanFormat `
and `supportedSavepointFormat `:
`supportedPlanFormat ` supports multiple versions,
while `supportedSavepointFormat ` supports only one version ?
A json plan  can be deserialized by multiple versions
because default value will be set for new fields.
In theory, a Savepoint can be restored by more than one version
of the operators even if a state layout is changed,
such as deleting a whole state and starting job with
`allowNonRestoredState`=true.
I think this is a corner case, and it's hard to understand comparing
to `supportedPlanFormat ` supporting multiple versions.
So, for most cases, when the state layout is changed, the savepoint is
incompatible,
and `supportedSavepointFormat` and version need to be changed.

I think we need a detail explanation about the annotations change story in
the java doc of  `ExecNodeMetadata` class for all developers
(esp. those unfamiliar with this part).

Best,
Godfrey

Timo Walther  于2021年12月8日周三 下午4:57写道:


Hi Wenlong,

thanks for the feedback. Great that we reached consensus here. I will
update the entire document with my previous example shortly.

  > if we don't update the version when plan format changes, we can't
find that the plan can't not be deserialized in 1.15

This should not be a problem as the entire plan file has a version as
well. We should not allow reading a 1.16 plan in 1.15. We can throw a
helpful exception early.

Reading a 1.15 plan in 1.16 is possible until we drop the old
`supportedPlanFormat` from one of used ExecNodes. Afterwards all
`supportedPlanFormat` of ExecNodes must be equal or higher then the plan
version.

Regards,
Timo

On 08.12.21 03:07, wenlong.lwl wrote:

Hi, Timo,  +1 for multi metadata.

The compatible change I mean in the last email is the slight state

change

example you gave, so we have got  consensus on this actually, IMO.

Another question based on the example you gave:
In the example "JSON node gets an additional property in 1.16", if we

don't

update the version when plan format changes, we can't find that the

plan

can't not be deserialized in 1.15, although the savepoint state is
compatible.
The error message may be not so friendly if we just throw

deserialization

failure.

On Tue, 7 Dec 2021 at 16:49, Timo Walther  wrote:


Hi Wenlong,

   > First,  we add a newStateLayout because of some improvement in

state, in

   > order to keep compatibility we may still keep 

[jira] [Created] (FLINK-25230) Harden type serialization in JSON plan

2021-12-09 Thread Timo Walther (Jira)
Timo Walther created FLINK-25230:


 Summary: Harden type serialization in JSON plan
 Key: FLINK-25230
 URL: https://issues.apache.org/jira/browse/FLINK-25230
 Project: Flink
  Issue Type: Sub-task
Reporter: Timo Walther


1. Introduce two representations for LogicalType

Compact one (using asSerializableString):

{code}
// compact one
outputType: "ROW"

// full one for all kinds of logical types (time attributes, char(0), inline 
structured, etc.)
outputType: {
  "root" : "ROW",
  "nullable" : true,
  "fields" : [ {
"i" : "INT"
  }, {
"s" : "VARCHAR(2147483647)"
  }]
}
{code}

2. Drop support of legacy types and symbol classes which should not be part of 
the plan

3. Rework DataView support (shorten, remove concrete classes, support any 
external type in accumulators)

4. Implement a DataTypeJsonDeSerializer

5. Replace RelDataTypeJsonDeSerializer with LogicalTypeJsonDeSerializer



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25221) Allow global table options for all table connectors

2021-12-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-25221:


 Summary: Allow global table options for all table connectors
 Key: FLINK-25221
 URL: https://issues.apache.org/jira/browse/FLINK-25221
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Timo Walther


Some options (e.g. `sink.parallelism`) should be defined globally for all 
connectors. This would avoid implementing the same option for each connector 
individually.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-08 Thread Timo Walther

Hi Wenlong,

thanks for the feedback. Great that we reached consensus here. I will 
update the entire document with my previous example shortly.


> if we don't update the version when plan format changes, we can't 
find that the plan can't not be deserialized in 1.15


This should not be a problem as the entire plan file has a version as 
well. We should not allow reading a 1.16 plan in 1.15. We can throw a 
helpful exception early.


Reading a 1.15 plan in 1.16 is possible until we drop the old 
`supportedPlanFormat` from one of used ExecNodes. Afterwards all 
`supportedPlanFormat` of ExecNodes must be equal or higher then the plan 
version.


Regards,
Timo

On 08.12.21 03:07, wenlong.lwl wrote:

Hi, Timo,  +1 for multi metadata.

The compatible change I mean in the last email is the slight state change
example you gave, so we have got  consensus on this actually, IMO.

Another question based on the example you gave:
In the example "JSON node gets an additional property in 1.16", if we don't
update the version when plan format changes, we can't find that the plan
can't not be deserialized in 1.15, although the savepoint state is
compatible.
The error message may be not so friendly if we just throw deserialization
failure.

On Tue, 7 Dec 2021 at 16:49, Timo Walther  wrote:


Hi Wenlong,

  > First,  we add a newStateLayout because of some improvement in state, in
  > order to keep compatibility we may still keep the old state for the
first
  > version. We need to update the version, so that we can generate a new
  > version plan for the new job and keep the exec node compatible with
the old
  > version plan.

The problem that I see here for contributors is that the actual update
of a version is more complicated than just updating an integer value. It
means copying a lot of ExecNode code for a change that happens locally
in an operator. Let's assume multiple ExecNodes use a similar operator.
Why do we need to update all ExecNode versions, if the operator itself
can deal with the incompatibility. The ExecNode version is meant for
topology changes or fundamental state changes.

If we don't find consensus on this topic, I would at least vote for
supporting multiple annotations for an ExecNode class. This way we don't
need to copy code but only add two ExecNode annotations with different
ExecNode versions.

  > Maybe we can add support for this case :
  > when an exec node is changed in 1.16, but is compatible with 1.15,
  > we can use the node of 1.16 to deserialize the plan of 1.15.

If the ExecNode is compatible, there is no reason to increase the
ExecNode version.



I tried to come up with a reworked solution to make all parties happy:

1. Let's assume the following annotations:

supportedPlanFormat = [1.15]

supportedSavepointFormat = 1.15

we drop `added` as it is equal to `supportedSavepointFormat`

2. Multiple annotations over ExecNodes are possible:

// operator state changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// state layout changed slightly in 1.16
// - operator migration is possible
// - operator supports state of both versions and will perform operator
state migration
// - new plans will get new ExecNode version
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)

// we force a plan migration in 1.17
// - we assume that all operator states have been migrated in the
previous version
// - we can safely replace the old version `1` with `2` and only keep
the new savepoint format
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)


// plan changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// JSON node gets an additional property in 1.16
// e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
// - ExecNode version does not change
// - ExecNode version only changes when topology or state is affected
// - we support both JSON plan formats, the old and the newest one
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15, 1.16],
supportedSavepointFormat=1.15)

// we force a plan migration in 1.17
// - now we only support 1.16 plan format
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16,
supportedSavepointFormat=1.15)


// topology change

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
supportedSavepointFormat=1.15)

// complete new class structure in 1.16 annotated with
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
supportedSavepointFormat=1.16)



What do you think?


Regards,
Timo









On 07.12.21 08:20, wenlong.lwl wrote:

Maybe we can add support for this case :
  when an exec node is changed in 1.16, b

[jira] [Created] (FLINK-25217) FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-07 Thread Timo Walther (Jira)
Timo Walther created FLINK-25217:


 Summary: FLIP-190: Support Version Upgrades for Table API & SQL 
Programs
 Key: FLINK-25217
 URL: https://issues.apache.org/jira/browse/FLINK-25217
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Reporter: Timo Walther


Nowadays, the Table & SQL API is as important to Flink as the DataStream API. 
It is one of the main abstractions for expressing pipelines that perform 
stateful stream processing. Users expect the same backwards compatibility 
guarantees when upgrading to a newer Flink version as with the DataStream API.

In particular, this means:

* once the operator topology is defined, it remains static and does not change 
between Flink versions, unless resulting in better performance,
* business logic (defined using expressions and functions in queries) behaves 
identical as before the version upgrade,
* the state of a Table & SQL API program can be restored from a savepoint of a 
previous version,
* adding or removing stateful operators should be made possible in the 
DataStream API.

The same query can remain up and running after upgrades.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-07 Thread Timo Walther

Hi Wenlong,

> First,  we add a newStateLayout because of some improvement in state, in
> order to keep compatibility we may still keep the old state for the first
> version. We need to update the version, so that we can generate a new
> version plan for the new job and keep the exec node compatible with 
the old

> version plan.

The problem that I see here for contributors is that the actual update 
of a version is more complicated than just updating an integer value. It 
means copying a lot of ExecNode code for a change that happens locally 
in an operator. Let's assume multiple ExecNodes use a similar operator. 
Why do we need to update all ExecNode versions, if the operator itself 
can deal with the incompatibility. The ExecNode version is meant for 
topology changes or fundamental state changes.


If we don't find consensus on this topic, I would at least vote for 
supporting multiple annotations for an ExecNode class. This way we don't 
need to copy code but only add two ExecNode annotations with different 
ExecNode versions.


> Maybe we can add support for this case :
> when an exec node is changed in 1.16, but is compatible with 1.15,
> we can use the node of 1.16 to deserialize the plan of 1.15.

If the ExecNode is compatible, there is no reason to increase the 
ExecNode version.




I tried to come up with a reworked solution to make all parties happy:

1. Let's assume the following annotations:

supportedPlanFormat = [1.15]

supportedSavepointFormat = 1.15

we drop `added` as it is equal to `supportedSavepointFormat`

2. Multiple annotations over ExecNodes are possible:

// operator state changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, 
supportedSavepointFormat=1.15)


// state layout changed slightly in 1.16
// - operator migration is possible
// - operator supports state of both versions and will perform operator 
state migration

// - new plans will get new ExecNode version
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, 
supportedSavepointFormat=1.15)
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, 
supportedSavepointFormat=1.16)


// we force a plan migration in 1.17
// - we assume that all operator states have been migrated in the 
previous version
// - we can safely replace the old version `1` with `2` and only keep 
the new savepoint format
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, 
supportedSavepointFormat=1.16)



// plan changes

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, 
supportedSavepointFormat=1.15)


// JSON node gets an additional property in 1.16
// e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
// - ExecNode version does not change
// - ExecNode version only changes when topology or state is affected
// - we support both JSON plan formats, the old and the newest one
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15, 1.16], 
supportedSavepointFormat=1.15)


// we force a plan migration in 1.17
// - now we only support 1.16 plan format
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16, 
supportedSavepointFormat=1.15)



// topology change

// initial introduction in 1.15
@ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, 
supportedSavepointFormat=1.15)


// complete new class structure in 1.16 annotated with
@ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, 
supportedSavepointFormat=1.16)




What do you think?


Regards,
Timo









On 07.12.21 08:20, wenlong.lwl wrote:

Maybe we can add support for this case :
 when an exec node is changed in 1.16, but is compatible with 1.15,
we can use the node of 1.16 to deserialize the plan of 1.15.
By this way, we don't need to fork the code if the change is compatible,
and can avoid fork code frequently.


Best,
Wenlong


On Tue, 7 Dec 2021 at 15:08, wenlong.lwl  wrote:


hi, Timo, I would prefer to update the version every time we change the
state layer too.

It could be possible that we change the exec node in 2 steps:
First,  we add a newStateLayout because of some improvement in state, in
order to keep compatibility we may still keep the old state for the first
version. We need to update the version, so that we can generate a new
version plan for the new job and keep the exec node compatible with the old
version plan.
After some versions, we may remove the old version state layout and clean
up the deprecated code. We still need to update the version, so that we can
verify that we are compatible with the plan after the first change, but not
compatible with the plan earlier.


Best,
Wenlong

On Mon, 6 Dec 2021 at 21:27, Timo Walther  wrote:


Hi Godfrey,

  > design makes thing more complex.

Yes, the design might be a bit more complex. But operator migration is
way easier than ExecNode migration at a later point in time for code
maintenance. We know that ExecNodes can beco

[jira] [Created] (FLINK-25199) fromValues does not emit final MAX watermark

2021-12-06 Thread Timo Walther (Jira)
Timo Walther created FLINK-25199:


 Summary: fromValues does not emit final MAX watermark
 Key: FLINK-25199
 URL: https://issues.apache.org/jira/browse/FLINK-25199
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Timo Walther


It seems {{fromValues}} that generates multiple rows does not emit any 
watermarks:

{code}
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table inputTable =
tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("weight", DataTypes.DOUBLE()),
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.DOUBLE()),
DataTypes.FIELD("f2", DataTypes.DOUBLE()),
DataTypes.FIELD("f3", DataTypes.DOUBLE()),
DataTypes.FIELD("f4", DataTypes.INT()),
DataTypes.FIELD("label", DataTypes.STRING())),
Row.of(1., "a", 1., 1., 1., 2, "l1"),
Row.of(1., "a", 1., 1., 1., 2, "l1"));

DataStream input = tEnv.toDataStream(inputTable);
{code}

{{fromValues(1, 2, 3)}} or {{fromValues}} with only 1 row works correctly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Deprecate Java 8 support

2021-12-06 Thread Timo Walther

+1 (binding)

Thanks,
Timo

On 06.12.21 17:28, David Morávek wrote:

+1 (non-binding)

On Mon, Dec 6, 2021 at 4:55 PM Ingo Bürk  wrote:


+1 (non-binding)


Ingo

On Mon, Dec 6, 2021 at 4:44 PM Chesnay Schepler 
wrote:


Hello,

after recent discussions on the dev
 and
user 
mailing list to deprecate Java 8 support, with a general consensus in
favor of it, I would now like tod o a formal vote.

The deprecation would entail a notification to our users to encourage
migrating to Java 11, and various efforts on our side to prepare a
migration to Java 11, like updating some e2e tests to actually run on
Java 11, performance benchmarking etc. .

There is no set date for the removal of Java 8 support.

We'll use the usual minimum 72h vote duration, with committers having
binding votes.










Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-06 Thread Timo Walther

Hi Godfrey,

> design makes thing more complex.

Yes, the design might be a bit more complex. But operator migration is 
way easier than ExecNode migration at a later point in time for code 
maintenance. We know that ExecNodes can become pretty complex. Even 
though we have put a lot of code into `CommonXXExecNode` it will be a 
lot of work to maintain multiple versions of ExecNodes. If we can avoid 
this with operator state migration, this should always be preferred over 
a new ExecNode version.


I'm aware that operator state migration might only be important for 
roughly 10 % of all changes. A new ExecNode version will be used for 90% 
of all changes.


> If there are multiple state layouts, which layout the ExecNode should 
use?


It is not the responsibility of the ExecNode to decide this but the 
operator. Something like:


class X extends ProcessFunction {
  ValueState oldStateLayout;
  ValueState newStateLayout;

  open() {
if (oldStateLayout.get() != null) {
  performOperatorMigration();
}
useNewStateLayout();
  }
}

Operator migration is meant for smaller "more local" changes without 
touching the ExecNode layer. The CEP library and DataStream API sources 
are performing operator migration for years already.



> `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit 
obscure.


Let me try to come up with more examples why I think both annotation 
make sense and are esp. important *for test coverage*.


supportedPlanChanges:

Let's assume we have some JSON in Flink 1.15:

{
  some-prop: 42
}

And we want to extend the JSON in Flink 1.16:

{
  some-prop: 42,
  some-flag: false
}

Maybe we don't need to increase the ExecNode version but only ensure 
that the flag is set to `false` by default for the older versions.


We need a location to track changes and document the changelog. With the 
help of the annotation supportedPlanChanges = [1.15, 1.16] we can verify 
that we have tests for both JSON formats.


And once we decide to drop the 1.15 format, we enforce plan migration 
and fill-in the default value `false` into the old plans and bump their 
JSON plan version to 1.16 or higher.




> once the state layout is changed, the ExecNode version needs also be 
updated


This will still be the majority of cases. But if we can avoid this, we 
should do it for not having too much duplicate code to maintain.




Thanks,
Timo


On 06.12.21 09:58, godfrey he wrote:

Hi, Timo,

Thanks for the detailed explanation.


We change an operator state of B in Flink 1.16. We perform the change in the 
operator of B in a way to support both state layouts. Thus, no need for a new 
ExecNode version.


I think this design makes thing more complex.
1. If there are multiple state layouts, which layout the ExecNode should use ?
It increases the cost of understanding for developers (especially for
Flink newer),
making them prone to mistakes.
2. `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit obscure.

The purpose of ExecNode annotations are not only to support powerful validation,
but more importantly to make it easy for developers to understand
to ensure that every modification is easy and state compatible.

I prefer, once the state layout is changed, the ExecNode version needs
also be updated.
which could make thing simple. How about
rename `supportedPlanChanges ` to `planCompatibleVersion`
(which means the plan is compatible with the plan generated by the
given version node)
  and rename `supportedSavepointChanges` to `savepointCompatibleVersion `
(which means the state is compatible with the state generated by the
given version node) ?
The names also indicate that only one version value can be set.

WDYT?

Best,
Godfrey









Timo Walther  于2021年12月2日周四 下午11:42写道:


Response to Marios's feedback:

  > there should be some good logging in place when the upgrade is taking
place

Yes, I agree. I added this part to the FLIP.

  > config option instead that doesn't provide the flexibility to
overwrite certain plans

One can set the config option also around sections of the
multi-statement SQL script.

SET 'table.plan.force-recompile'='true';

COMPILE ...

SET 'table.plan.force-recompile'='false';

But the question is why a user wants to run COMPILE multiple times. If
it is during development, then running EXECUTE (or just the statement
itself) without calling COMPILE should be sufficient. The file can also
manually be deleted if necessary.

What do you think?

Regards,
Timo



On 02.12.21 16:09, Timo Walther wrote:

Hi Till,

Yes, you might have to. But not a new plan from the SQL query but a
migration from the old plan to the new plan. This will not happen often.
But we need a way to evolve the format of the JSON plan itself.

Maybe this confuses a bit, so let me clarify it again: Mostly ExecNode
versions and operator state layouts will evolve. Not the plan files,
those will be pretty stable. But also not infinitely.

Regards,
Timo


On 02.12.2

Re: [DISCUSS] FLIP-197: API stability graduation process

2021-12-06 Thread Timo Walther

Hi Till,

thanks for starting this discussion. I think this topic should have been 
discussed way earlier. I have two questions:


1) It might be an implementation detail but where do you expect this 
`FlinkVersion` to be located? This is actually a quite important class 
that also needs to be made available for other use cases. For the SQL 
upgrade story we will definitely need a similar enum. And I think we 
have something similar for other tests (see `MigrationVersion`). For 
reducing releasing overhead, it would be good to unify all these 
"version metadata".


2) Deprecations: Shall we also start versioning deprecation decisions? 
Esp. for Experimental/PublicEvolving interfaces we should remove 
deprecated code in time. We should also let users know when we are 
planning to remove code. E.g. clearly indicate that the deprecation will 
happen in the next major version?


Regards,
Timo

On 06.12.21 10:04, Till Rohrmann wrote:

Ok, then lets increase the graduation period to 2 releases. If we see that
this is super easy for us to do, then we can shorten it in the future.

Cheers,
Till

On Mon, Dec 6, 2021 at 9:54 AM Chesnay Schepler  wrote:


Given that you can delay the graduation if there is a good reason for
it, we should be able to cover that case even if the graduation would
happen by default after 1 month.

That said, personally I would also be in favor of 2 releases; we see
plenty of users not upgrading to every single Flink version, and this
may  give us a bit more coverage.

On 06/12/2021 09:20, Ingo Bürk wrote:

Hi Till,

from my (admittedly limited) experience with how far projects lag behind

in

terms of Flink versions – yes, the combined time it would take to mature
then seems reasonable enough for a sufficient adoption, IMO.

Another reason why I think two releases as a default for the last step
makes sense: say you mature an API to PublicEvolving. Typically, there

will

be issues found afterwards. Even if you address these in the very next
release cycle, a duration of one release would mean you fully mature the
API in the same release in which things are still being fixed;

intuitively,

it makes sense to me that the step to Public would come after a period of
no changes needed, however.


Ingo

On Fri, Dec 3, 2021 at 4:55 PM Till Rohrmann 

wrote:



Hi Ingo, thanks for your feedback.

Do you think that two release cycles per graduation step would be long
enough or should it be longer?

Cheers,
Till

On Fri, Dec 3, 2021 at 4:29 PM Ingo Bürk  wrote:


Hi Till,

Overall I whole-heartedly agree with the proposals in this FLIP. Thank

you

for starting this discussion as well! This seems like something that

could

be tested quite nicely with ArchUnit as well; I'll be happy to help

should

the FLIP be accepted.


I would propose per default a single release.

The step from PublicEvolving to Public feels more important to me, and

I

would personally suggest making this transition a bit longer. We have a

bit

of a chicken-egg problem here, because the goal of your FLIP is,
ultimately, also to motivate faster adoption of new Flink versions, but

the

status quo prevents that; if we mature APIs too quickly, we risk losing

out

on important feedback. Therefore, I would propose starting slower here,

and

rather think about shortening that cycle in the future.


Best
Ingo

On Thu, Dec 2, 2021 at 3:57 PM Till Rohrmann 

wrote:

Hi everyone,

As promised, here is the follow-up FLIP [1] for discussing how we can
ensure that newly introduced APIs are being stabilized over time. This

FLIP

is related to FLIP-196 [2].

The idea of FLIP-197 is to introduce an API graduation process that

forces

us to increase the API stability guarantee unless there is a very good
reason not to do so. So the proposal is to reverse the process from

opt-in

(increasing the stability guarantee explicitly) to opt-out (deciding

that

an API cannot be graduated with a good reason).

Since every process breaks if it is not automated, we propose a richer

set

of API stability annotations that can capture enough information so

that

we

can implement a test that fails if we fail to follow the process.

Looking forward to your feedback.

Hopefully, we can provide our users a better experience when working

with

Flink because we offer more stable APIs and make them available

faster.


[1] https://cwiki.apache.org/confluence/x/J5eqCw
[2] https://cwiki.apache.org/confluence/x/IJeqCw

Cheers,
Till










Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-02 Thread Timo Walther

Response to Marios's feedback:

> there should be some good logging in place when the upgrade is taking 
place


Yes, I agree. I added this part to the FLIP.

> config option instead that doesn't provide the flexibility to 
overwrite certain plans


One can set the config option also around sections of the 
multi-statement SQL script.


SET 'table.plan.force-recompile'='true';

COMPILE ...

SET 'table.plan.force-recompile'='false';

But the question is why a user wants to run COMPILE multiple times. If 
it is during development, then running EXECUTE (or just the statement 
itself) without calling COMPILE should be sufficient. The file can also 
manually be deleted if necessary.


What do you think?

Regards,
Timo



On 02.12.21 16:09, Timo Walther wrote:

Hi Till,

Yes, you might have to. But not a new plan from the SQL query but a 
migration from the old plan to the new plan. This will not happen often. 
But we need a way to evolve the format of the JSON plan itself.


Maybe this confuses a bit, so let me clarify it again: Mostly ExecNode 
versions and operator state layouts will evolve. Not the plan files, 
those will be pretty stable. But also not infinitely.


Regards,
Timo


On 02.12.21 16:01, Till Rohrmann wrote:

Then for migrating from Flink 1.10 to 1.12, I might have to create a new
plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12, right?

Cheers,
Till

On Thu, Dec 2, 2021 at 3:39 PM Timo Walther  wrote:


Response to Till's feedback:

  > compiled plan won't be changed after being written initially

This is not entirely correct. We give guarantees for keeping the query
up and running. We reserve us the right to force plan migrations. In
this case, the plan might not be created from the SQL statement but from
the old plan. I have added an example in section 10.1.1. In general,
both persisted entities "plan" and "savepoint" can evolve independently
from each other.

Thanks,
Timo

On 02.12.21 15:10, Timo Walther wrote:

Response to Godfrey's feedback:

  > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing.

Thanks for the hint. I added a dedicated section 7.1.3.


  > it's hard to maintain the supported versions for
"supportedPlanChanges" and "supportedSavepointChanges"

Actually, I think we are mostly on the same page.

The annotation does not need to be updated for every Flink version. As
the name suggests it is about "Changes" (in other words:
incompatibilities) that require some kind of migration. Either plan
migration (= PlanChanges) or savepoint migration (=SavepointChanges,
using operator migration or savepoint migration).

Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The annotations are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

We change an operator state of B in Flink 1.16.

We perform the change in the operator of B in a way to support both
state layouts. Thus, no need for a new ExecNode version.

The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15, 1.16)

So the versions in the annotations are "start version"s.

I don't think we need end versions? End version would mean that we drop
the ExecNode from the code base?

Please check the section 10.1.1 again. I added a more complex example.


Thanks,
Timo



On 01.12.21 16:29, Timo Walther wrote:

Response to Francesco's feedback:

  > *Proposed changes #6*: Other than defining this rule of thumb, we
must also make sure that compiling plans with these objects that
cannot be serialized in the plan must fail hard

Yes, I totally agree. We will fail hard with a helpful exception. Any
mistake e.g. using a inline object in Table API or an invalid
DataStream API source without uid should immediately fail a plan
compilation step. I added a remark to the FLIP again.

  > What worries me is breaking changes, in particular behavioural
changes that might happen in connectors/formats

Breaking changes in connectors and formats need to be encoded in the
options. I could also imagine to versioning in the factory identifier
`connector=kafka` and `connector=kafka-2`. If this is necessary.

After thinking about your question again, I think we will also need
the same testing infrastructure for our connectors and formats. Esp.
restore tests and completeness test. I updated the document
accordingly. Also I added a way to generate UIDs for DataStream API
providers.

  > *Functions:* Are we talking about the function name or the 
function

complete signature?

For catalog functions, the identifier contains catalog name and
database name. For system functions, identifier contains only a name
which make function name and identifier identical. I r

Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-02 Thread Timo Walther

Hi Till,

Yes, you might have to. But not a new plan from the SQL query but a 
migration from the old plan to the new plan. This will not happen often. 
But we need a way to evolve the format of the JSON plan itself.


Maybe this confuses a bit, so let me clarify it again: Mostly ExecNode 
versions and operator state layouts will evolve. Not the plan files, 
those will be pretty stable. But also not infinitely.


Regards,
Timo


On 02.12.21 16:01, Till Rohrmann wrote:

Then for migrating from Flink 1.10 to 1.12, I might have to create a new
plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12, right?

Cheers,
Till

On Thu, Dec 2, 2021 at 3:39 PM Timo Walther  wrote:


Response to Till's feedback:

  > compiled plan won't be changed after being written initially

This is not entirely correct. We give guarantees for keeping the query
up and running. We reserve us the right to force plan migrations. In
this case, the plan might not be created from the SQL statement but from
the old plan. I have added an example in section 10.1.1. In general,
both persisted entities "plan" and "savepoint" can evolve independently
from each other.

Thanks,
Timo

On 02.12.21 15:10, Timo Walther wrote:

Response to Godfrey's feedback:

  > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing.

Thanks for the hint. I added a dedicated section 7.1.3.


  > it's hard to maintain the supported versions for
"supportedPlanChanges" and "supportedSavepointChanges"

Actually, I think we are mostly on the same page.

The annotation does not need to be updated for every Flink version. As
the name suggests it is about "Changes" (in other words:
incompatibilities) that require some kind of migration. Either plan
migration (= PlanChanges) or savepoint migration (=SavepointChanges,
using operator migration or savepoint migration).

Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The annotations are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

We change an operator state of B in Flink 1.16.

We perform the change in the operator of B in a way to support both
state layouts. Thus, no need for a new ExecNode version.

The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
supportedSavepointChanges=1.15, 1.16)

So the versions in the annotations are "start version"s.

I don't think we need end versions? End version would mean that we drop
the ExecNode from the code base?

Please check the section 10.1.1 again. I added a more complex example.


Thanks,
Timo



On 01.12.21 16:29, Timo Walther wrote:

Response to Francesco's feedback:

  > *Proposed changes #6*: Other than defining this rule of thumb, we
must also make sure that compiling plans with these objects that
cannot be serialized in the plan must fail hard

Yes, I totally agree. We will fail hard with a helpful exception. Any
mistake e.g. using a inline object in Table API or an invalid
DataStream API source without uid should immediately fail a plan
compilation step. I added a remark to the FLIP again.

  > What worries me is breaking changes, in particular behavioural
changes that might happen in connectors/formats

Breaking changes in connectors and formats need to be encoded in the
options. I could also imagine to versioning in the factory identifier
`connector=kafka` and `connector=kafka-2`. If this is necessary.

After thinking about your question again, I think we will also need
the same testing infrastructure for our connectors and formats. Esp.
restore tests and completeness test. I updated the document
accordingly. Also I added a way to generate UIDs for DataStream API
providers.

  > *Functions:* Are we talking about the function name or the function
complete signature?

For catalog functions, the identifier contains catalog name and
database name. For system functions, identifier contains only a name
which make function name and identifier identical. I reworked the
section again and also fixed some of the naming conflicts you mentioned.

  > we should perhaps use a logically defined unique id like
/bigIntToTimestamp/

I added a concrete example for the resolution and restoration. The
unique id is composed of name + version. Internally, this is
represented as `$TO_TIMESTAMP_LTZ$1`.

  > I think we should rather keep JSON out of the concept

Sounds ok to me. In SQL we also just call it "plan". I will change the
file sections. But would suggest to keep the fromJsonString method.

  > write it back in the original plan file

I updated the terminology section for what we consider an "upgrade".
We might need to update the orginal plan file. This is already
considered in the COMPILE PLAN ... FRO

Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-02 Thread Timo Walther

Response to Till's feedback:

> compiled plan won't be changed after being written initially

This is not entirely correct. We give guarantees for keeping the query 
up and running. We reserve us the right to force plan migrations. In 
this case, the plan might not be created from the SQL statement but from 
the old plan. I have added an example in section 10.1.1. In general, 
both persisted entities "plan" and "savepoint" can evolve independently 
from each other.


Thanks,
Timo

On 02.12.21 15:10, Timo Walther wrote:

Response to Godfrey's feedback:

 > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing.

Thanks for the hint. I added a dedicated section 7.1.3.


 > it's hard to maintain the supported versions for 
"supportedPlanChanges" and "supportedSavepointChanges"


Actually, I think we are mostly on the same page.

The annotation does not need to be updated for every Flink version. As 
the name suggests it is about "Changes" (in other words: 
incompatibilities) that require some kind of migration. Either plan 
migration (= PlanChanges) or savepoint migration (=SavepointChanges, 
using operator migration or savepoint migration).


Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The annotations are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15)


@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15)


We change an operator state of B in Flink 1.16.

We perform the change in the operator of B in a way to support both 
state layouts. Thus, no need for a new ExecNode version.


The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15)


@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15, 1.16)


So the versions in the annotations are "start version"s.

I don't think we need end versions? End version would mean that we drop 
the ExecNode from the code base?


Please check the section 10.1.1 again. I added a more complex example.


Thanks,
Timo



On 01.12.21 16:29, Timo Walther wrote:

Response to Francesco's feedback:

 > *Proposed changes #6*: Other than defining this rule of thumb, we 
must also make sure that compiling plans with these objects that 
cannot be serialized in the plan must fail hard


Yes, I totally agree. We will fail hard with a helpful exception. Any 
mistake e.g. using a inline object in Table API or an invalid 
DataStream API source without uid should immediately fail a plan 
compilation step. I added a remark to the FLIP again.


 > What worries me is breaking changes, in particular behavioural 
changes that might happen in connectors/formats


Breaking changes in connectors and formats need to be encoded in the 
options. I could also imagine to versioning in the factory identifier 
`connector=kafka` and `connector=kafka-2`. If this is necessary.


After thinking about your question again, I think we will also need 
the same testing infrastructure for our connectors and formats. Esp. 
restore tests and completeness test. I updated the document 
accordingly. Also I added a way to generate UIDs for DataStream API 
providers.


 > *Functions:* Are we talking about the function name or the function 
complete signature?


For catalog functions, the identifier contains catalog name and 
database name. For system functions, identifier contains only a name 
which make function name and identifier identical. I reworked the 
section again and also fixed some of the naming conflicts you mentioned.


 > we should perhaps use a logically defined unique id like 
/bigIntToTimestamp/


I added a concrete example for the resolution and restoration. The 
unique id is composed of name + version. Internally, this is 
represented as `$TO_TIMESTAMP_LTZ$1`.


 > I think we should rather keep JSON out of the concept

Sounds ok to me. In SQL we also just call it "plan". I will change the 
file sections. But would suggest to keep the fromJsonString method.


 > write it back in the original plan file

I updated the terminology section for what we consider an "upgrade". 
We might need to update the orginal plan file. This is already 
considered in the COMPILE PLAN ... FROM ... even though this is future 
work. Also savepoint migration.


Thanks for all the feedback!

Timo


On 30.11.21 14:28, Timo Walther wrote:

Response to Wenlongs's feedback:

 > I would prefer not to provide such a shortcut, let users use 
COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be 
understood by new users even without inferring the docs.


I would like to hear more opinions on this topic. Personally, I find 
a combined statement very useful. Not only for quicker development 
and debugging but also for readability. It helps in keeping the JSON 
path and the query close to each other in order to know the 

Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-02 Thread Timo Walther

Response to Godfrey's feedback:

> "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing.

Thanks for the hint. I added a dedicated section 7.1.3.


> it's hard to maintain the supported versions for 
"supportedPlanChanges" and "supportedSavepointChanges"


Actually, I think we are mostly on the same page.

The annotation does not need to be updated for every Flink version. As 
the name suggests it is about "Changes" (in other words: 
incompatibilities) that require some kind of migration. Either plan 
migration (= PlanChanges) or savepoint migration (=SavepointChanges, 
using operator migration or savepoint migration).


Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The annotations are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15)


@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15)


We change an operator state of B in Flink 1.16.

We perform the change in the operator of B in a way to support both 
state layouts. Thus, no need for a new ExecNode version.


The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15)


@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, 
supportedSavepointChanges=1.15, 1.16)


So the versions in the annotations are "start version"s.

I don't think we need end versions? End version would mean that we drop 
the ExecNode from the code base?


Please check the section 10.1.1 again. I added a more complex example.


Thanks,
Timo



On 01.12.21 16:29, Timo Walther wrote:

Response to Francesco's feedback:

 > *Proposed changes #6*: Other than defining this rule of thumb, we 
must also make sure that compiling plans with these objects that cannot 
be serialized in the plan must fail hard


Yes, I totally agree. We will fail hard with a helpful exception. Any 
mistake e.g. using a inline object in Table API or an invalid DataStream 
API source without uid should immediately fail a plan compilation step. 
I added a remark to the FLIP again.


 > What worries me is breaking changes, in particular behavioural 
changes that might happen in connectors/formats


Breaking changes in connectors and formats need to be encoded in the 
options. I could also imagine to versioning in the factory identifier 
`connector=kafka` and `connector=kafka-2`. If this is necessary.


After thinking about your question again, I think we will also need the 
same testing infrastructure for our connectors and formats. Esp. restore 
tests and completeness test. I updated the document accordingly. Also I 
added a way to generate UIDs for DataStream API providers.


 > *Functions:* Are we talking about the function name or the function 
complete signature?


For catalog functions, the identifier contains catalog name and database 
name. For system functions, identifier contains only a name which make 
function name and identifier identical. I reworked the section again and 
also fixed some of the naming conflicts you mentioned.


 > we should perhaps use a logically defined unique id like 
/bigIntToTimestamp/


I added a concrete example for the resolution and restoration. The 
unique id is composed of name + version. Internally, this is represented 
as `$TO_TIMESTAMP_LTZ$1`.


 > I think we should rather keep JSON out of the concept

Sounds ok to me. In SQL we also just call it "plan". I will change the 
file sections. But would suggest to keep the fromJsonString method.


 > write it back in the original plan file

I updated the terminology section for what we consider an "upgrade". We 
might need to update the orginal plan file. This is already considered 
in the COMPILE PLAN ... FROM ... even though this is future work. Also 
savepoint migration.


Thanks for all the feedback!

Timo


On 30.11.21 14:28, Timo Walther wrote:

Response to Wenlongs's feedback:

 > I would prefer not to provide such a shortcut, let users use  
COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be 
understood by new users even without inferring the docs.


I would like to hear more opinions on this topic. Personally, I find a 
combined statement very useful. Not only for quicker development and 
debugging but also for readability. It helps in keeping the JSON path 
and the query close to each other in order to know the origin of the 
plan.


 > but the plan and SQL are not matched. The result would be quite 
confusing if we still execute the plan directly, we may need to add a 
validation.


You are right that there could be a mismatch. But we have a similar 
problem when executing CREATE TABLE IF NOT EXISTS. The schema or 
options of a table could have changed completely in the catalog but 
the CREATE TABLE IF NOT EXISTS is not executed again. So a mismatch 
could also occur there.


Regards,
Timo

On 30.11.21 14:17, Timo Walther wrote:

Hi everyone,

Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-01 Thread Timo Walther

Response to Francesco's feedback:

> *Proposed changes #6*: Other than defining this rule of thumb, we 
must also make sure that compiling plans with these objects that cannot 
be serialized in the plan must fail hard


Yes, I totally agree. We will fail hard with a helpful exception. Any 
mistake e.g. using a inline object in Table API or an invalid DataStream 
API source without uid should immediately fail a plan compilation step. 
I added a remark to the FLIP again.


> What worries me is breaking changes, in particular behavioural 
changes that might happen in connectors/formats


Breaking changes in connectors and formats need to be encoded in the 
options. I could also imagine to versioning in the factory identifier 
`connector=kafka` and `connector=kafka-2`. If this is necessary.


After thinking about your question again, I think we will also need the 
same testing infrastructure for our connectors and formats. Esp. restore 
tests and completeness test. I updated the document accordingly. Also I 
added a way to generate UIDs for DataStream API providers.


> *Functions:* Are we talking about the function name or the function 
complete signature?


For catalog functions, the identifier contains catalog name and database 
name. For system functions, identifier contains only a name which make 
function name and identifier identical. I reworked the section again and 
also fixed some of the naming conflicts you mentioned.


> we should perhaps use a logically defined unique id like 
/bigIntToTimestamp/


I added a concrete example for the resolution and restoration. The 
unique id is composed of name + version. Internally, this is represented 
as `$TO_TIMESTAMP_LTZ$1`.


> I think we should rather keep JSON out of the concept

Sounds ok to me. In SQL we also just call it "plan". I will change the 
file sections. But would suggest to keep the fromJsonString method.


> write it back in the original plan file

I updated the terminology section for what we consider an "upgrade". We 
might need to update the orginal plan file. This is already considered 
in the COMPILE PLAN ... FROM ... even though this is future work. Also 
savepoint migration.


Thanks for all the feedback!

Timo


On 30.11.21 14:28, Timo Walther wrote:

Response to Wenlongs's feedback:

 > I would prefer not to provide such a shortcut, let users use  COMPILE 
PLAN IF NOT EXISTS and EXECUTE explicitly, which can be understood by 
new users even without inferring the docs.


I would like to hear more opinions on this topic. Personally, I find a 
combined statement very useful. Not only for quicker development and 
debugging but also for readability. It helps in keeping the JSON path 
and the query close to each other in order to know the origin of the plan.


 > but the plan and SQL are not matched. The result would be quite 
confusing if we still execute the plan directly, we may need to add a 
validation.


You are right that there could be a mismatch. But we have a similar 
problem when executing CREATE TABLE IF NOT EXISTS. The schema or options 
of a table could have changed completely in the catalog but the CREATE 
TABLE IF NOT EXISTS is not executed again. So a mismatch could also 
occur there.


Regards,
Timo

On 30.11.21 14:17, Timo Walther wrote:

Hi everyone,

thanks for the feedback so far. Let me answer each email indvidually.

I will start with a response to Ingo's feedback:

 > Will the JSON plan's schema be considered an API?

No, not in the first version. This is explicitly mentioned in the 
`General JSON Plan Assumptions`. I tried to improve the section once 
more to make it clearer. However, the JSON plan is definitely stable 
per minor version. And since the plan is versioned by Flink version, 
external tooling could be build around it. We might make it public API 
once the design has settled.


 > Given that upgrades across multiple versions at once are 
unsupported, do we verify this somehow?


Good question. I extended the `General JSON Plan Assumptions`. Now 
yes: the Flink version is part of the JSON plan and will be verified 
during restore. But keep in mind that we might support more that just 
the last version at least until the JSON plan has been migrated.


Regards,
Timo

On 30.11.21 09:39, Marios Trivyzas wrote:
I have a question regarding the `COMPILE PLAN OVEWRITE`. If we choose 
to go

with the config option instead,
that doesn't provide the flexibility to overwrite certain plans but not
others, since the config applies globally, isn't that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas  
wrote:



Hi Timo!

Thanks a lot for taking all that time and effort to put together this
proposal!

Regarding:
For simplification of the design, we assume that upgrades use a 
step size

of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

I think that for this first step we should make it absolutely clear

Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-11-30 Thread Timo Walther

Response to Wenlongs's feedback:

> I would prefer not to provide such a shortcut, let users use  COMPILE 
PLAN IF NOT EXISTS and EXECUTE explicitly, which can be understood by 
new users even without inferring the docs.


I would like to hear more opinions on this topic. Personally, I find a 
combined statement very useful. Not only for quicker development and 
debugging but also for readability. It helps in keeping the JSON path 
and the query close to each other in order to know the origin of the plan.


> but the plan and SQL are not matched. The result would be quite 
confusing if we still execute the plan directly, we may need to add a 
validation.


You are right that there could be a mismatch. But we have a similar 
problem when executing CREATE TABLE IF NOT EXISTS. The schema or options 
of a table could have changed completely in the catalog but the CREATE 
TABLE IF NOT EXISTS is not executed again. So a mismatch could also 
occur there.


Regards,
Timo

On 30.11.21 14:17, Timo Walther wrote:

Hi everyone,

thanks for the feedback so far. Let me answer each email indvidually.

I will start with a response to Ingo's feedback:

 > Will the JSON plan's schema be considered an API?

No, not in the first version. This is explicitly mentioned in the 
`General JSON Plan Assumptions`. I tried to improve the section once 
more to make it clearer. However, the JSON plan is definitely stable per 
minor version. And since the plan is versioned by Flink version, 
external tooling could be build around it. We might make it public API 
once the design has settled.


 > Given that upgrades across multiple versions at once are unsupported, 
do we verify this somehow?


Good question. I extended the `General JSON Plan Assumptions`. Now yes: 
the Flink version is part of the JSON plan and will be verified during 
restore. But keep in mind that we might support more that just the last 
version at least until the JSON plan has been migrated.


Regards,
Timo

On 30.11.21 09:39, Marios Trivyzas wrote:
I have a question regarding the `COMPILE PLAN OVEWRITE`. If we choose 
to go

with the config option instead,
that doesn't provide the flexibility to overwrite certain plans but not
others, since the config applies globally, isn't that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas  
wrote:



Hi Timo!

Thanks a lot for taking all that time and effort to put together this
proposal!

Regarding:
For simplification of the design, we assume that upgrades use a step 
size

of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

I think that for this first step we should make it absolutely clear 
to the

users that they would need to go through all intermediate versions
to end up with the target version they wish. If we are to support 
skipping

versions in the future, i.e. upgrade from 1.14 to 1.17, this means
that we need to have a testing infrastructure in place that would 
test all

possible combinations of version upgrades, i.e. from 1.14 to 1.15,
from 1.14 to 1.16 and so forth, while still testing and of course
supporting all the upgrades from the previous minor version.

I like a lot the idea of introducing HINTS to define some behaviour 
in the

programs!
- the hints live together with the sql statements and consequently the
(JSON) plans.
- If multiple queries are involved in a program, each one of them can
define its own config (regarding plan optimisation, not null 
enforcement,

etc)

I agree with Francesco on his argument regarding the *JSON* plan. I
believe we should already provide flexibility here, since (who knows) in
the future
a JSON plan might not fulfil the desired functionality.

I also agree that we need some very obvious way (i.e. not log entry) to
show the users that their program doesn't support version upgrades, and
prevent them from being negatively surprised in the future, when 
trying to

upgrade their production pipelines.

This is an implementation detail, but I'd like to add that there 
should be

some good logging in place when the upgrade is taking place, to be
able to track every restoration action, and help debug any potential
issues arising from that.





On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann 
wrote:

Thanks for writing this FLIP Timo. I think this will be a very 
important

improvement for Flink and our SQL user :-)

Similar to Francesco I would like to understand the statement


For simplification of the design, we assume that upgrades use a step

size
of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

a bit better. Is it because Flink does not guarantee that a savepoint
created by version 1.x can be directly recovered by version 1.y with 
x + 1
< y but users might have to go through a cascade of upgrades? From 
how I
understand your proposal, the compiled plan won't be changed after 
being

written initially. Hence, I would assume that for the plan alone 

Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-11-30 Thread Timo Walther

Hi everyone,

thanks for the feedback so far. Let me answer each email indvidually.

I will start with a response to Ingo's feedback:

> Will the JSON plan's schema be considered an API?

No, not in the first version. This is explicitly mentioned in the 
`General JSON Plan Assumptions`. I tried to improve the section once 
more to make it clearer. However, the JSON plan is definitely stable per 
minor version. And since the plan is versioned by Flink version, 
external tooling could be build around it. We might make it public API 
once the design has settled.


> Given that upgrades across multiple versions at once are unsupported, 
do we verify this somehow?


Good question. I extended the `General JSON Plan Assumptions`. Now yes: 
the Flink version is part of the JSON plan and will be verified during 
restore. But keep in mind that we might support more that just the last 
version at least until the JSON plan has been migrated.


Regards,
Timo

On 30.11.21 09:39, Marios Trivyzas wrote:

I have a question regarding the `COMPILE PLAN OVEWRITE`. If we choose to go
with the config option instead,
that doesn't provide the flexibility to overwrite certain plans but not
others, since the config applies globally, isn't that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas  wrote:


Hi Timo!

Thanks a lot for taking all that time and effort to put together this
proposal!

Regarding:

For simplification of the design, we assume that upgrades use a step size

of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

I think that for this first step we should make it absolutely clear to the
users that they would need to go through all intermediate versions
to end up with the target version they wish. If we are to support skipping
versions in the future, i.e. upgrade from 1.14 to 1.17, this means
that we need to have a testing infrastructure in place that would test all
possible combinations of version upgrades, i.e. from 1.14 to 1.15,
from 1.14 to 1.16 and so forth, while still testing and of course
supporting all the upgrades from the previous minor version.

I like a lot the idea of introducing HINTS to define some behaviour in the
programs!
- the hints live together with the sql statements and consequently the
(JSON) plans.
- If multiple queries are involved in a program, each one of them can
define its own config (regarding plan optimisation, not null enforcement,
etc)

I agree with Francesco on his argument regarding the *JSON* plan. I
believe we should already provide flexibility here, since (who knows) in
the future
a JSON plan might not fulfil the desired functionality.

I also agree that we need some very obvious way (i.e. not log entry) to
show the users that their program doesn't support version upgrades, and
prevent them from being negatively surprised in the future, when trying to
upgrade their production pipelines.

This is an implementation detail, but I'd like to add that there should be
some good logging in place when the upgrade is taking place, to be
able to track every restoration action, and help debug any potential
issues arising from that.





On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann 
wrote:


Thanks for writing this FLIP Timo. I think this will be a very important
improvement for Flink and our SQL user :-)

Similar to Francesco I would like to understand the statement


For simplification of the design, we assume that upgrades use a step

size
of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

a bit better. Is it because Flink does not guarantee that a savepoint
created by version 1.x can be directly recovered by version 1.y with x + 1
< y but users might have to go through a cascade of upgrades? From how I
understand your proposal, the compiled plan won't be changed after being
written initially. Hence, I would assume that for the plan alone Flink
will
have to give backwards compatibility guarantees for all versions. Am I
understanding this part correctly?

Cheers,
Till

On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
france...@ververica.com>
wrote:


Hi Timo,

Thanks for putting this amazing work together, I have some
considerations/questions
about the FLIP:
*Proposed changes #6*: Other than defining this rule of thumb, we must
also make sure
that compiling plans with these objects that cannot be serialized in the
plan must fail hard,
so users don't bite themselves with such issues, or at least we need to
output warning
logs. In general, whenever the user is trying to use the CompiledPlan

APIs

and at the same
time, they're trying to do something "illegal" for the plan, we should
immediately either
log or fail depending on the issue, in order to avoid any surprises once
the user upgrades.
I would also say the same for things like registering a function,
registering a DataStream,
and for every other thing which won't end up in the plan, we should log
such info to the
user by 

Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-30 Thread Timo Walther

Thanks for the healthy discussion. Also +1 from my side for this FLIP.

Thanks,
Timo

On 24.11.21 19:05, Stephan Ewen wrote:

Thanks for all the details and explanation.

With the conclusion of the discussion, also +1 from my side for this FLIP

On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li  wrote:


Thanks Stephan and Timo, I have a rough look at your replies. They are
all valuable opinions. I will take time to discuss, explain and
improve them.

Hi Timo,

At least a final "I will start the vote soon. Last call for comments."

would have been nice.

I replied in the DISCUSS thread that we began to vote. If there are
supplementary comments or reply "pause voting first, I will reply
later", we can suspend or cancel the voting at any time.
I understand why the FLIP must take three days to vote, so that more
people can see it and put forward their opinions.

Best,
Jingsong

On Sat, Nov 13, 2021 at 1:27 AM Timo Walther  wrote:


Hi everyone,

even though the DISCUSS thread was open for 2 weeks. I have the feeling
that the VOTE was initiated to quickly. At least a final "I will start
the vote soon. Last call for comments." would have been nice.

I also added some comments in the DISCUSS thread. Let's hope we can
resolve those soon.

Regards,
Timo

On 12.11.21 16:36, Stephan Ewen wrote:

Hi all!

I have a few questions on the design still, posted those in the

[DISCUSS]

thread.
It would be great to clarify those first before concluding this vote.

Thanks,
Stephan


On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:


+1 (binding)

Thanks for the great work Jingsong!

Best,
Jark

On Thu, 11 Nov 2021 at 19:41, JING ZHANG 

wrote:



+1 (non-binding)

A small suggestion:
The message queue is currently used to store middle layer data of the
streaming data warehouse. We hope use built-in dynamic table storage

to

store those middle layer.
But those middle data of the streaming data warehouse are often

provided

to

all business teams in a company. Some teams have not use Apache

Flink as

compute engine yet. In order to continue server those teams, the

data in

built-in dynamic table storage may be needed to copied to message

queue

again.
If *the built-in storage could provide same consumer API as the

commonly

used message queues*, data copying may be avoided. So the built-in

dynamic

table storage may be promoted faster in the streaming data warehouse
business.

Best regards,
Jing Zhang

Yufei Zhang  于2021年11月11日周四 上午9:34写道:


Hi,

+1 (non-binding)

Very interesting design. I saw a lot of discussion on the generic
interface design, good to know it will address extensibility.

Cheers,
Yufei


On 2021/11/10 02:51:55 Jingsong Li wrote:

Hi everyone,

Thanks for all the feedback so far. Based on the discussion[1] we

seem

to have consensus, so I would like to start a vote on FLIP-188 for
which the FLIP has now also been updated[2].

The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless
there is an objection or insufficient votes.

[1]

https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7

[2]







https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage


Best,
Jingsong














--
Best, Jingsong Lee







[DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-11-21 Thread Timo Walther

Hi everyone,

as many of you know, one of the biggest weaknesses of Flink's Table & 
SQL API are the difficulties around stateful upgrades between Flink 
minor versions (e.g. 1.13->1.14). Currently, we cannot provide any 
backwards guarantees in those scenarios and need to force users to 
reprocess historical data in order to "warm-up"/"bootstrap" their query 
state again.


In this FLIP, we would like to improve this situation and propose an 
upgrade story for Flink SQL. Preliminary work has been done in the last 
release. In the upcoming releases we would like to finalize and expose this.


The core idea is centered around a JSON plan that can be compiled from a 
SQL statement or statament set. The JSON plan represents a static 
topology (a graph of `ExecNode` in the planner) after optimization that 
can be restored in future Flink versions. The comunity will version 
corresponding execution nodes and take care of maintaining them across 
Flink versions.


Looking forward to your feedback:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489

Regards,
Timo

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution


Re: [DISCUSS] Shall casting functions return null or throw exceptions for invalid input

2021-11-18 Thread Timo Walther

Hi everyone,


thanks for finally have this discussion on the mailing list. As both a 
contributor and user, I have experienced a couple issues around 
nullability coming out of nowhere in a pipeline. This discussion should 
not only cover CAST but failure handling in general.


Let me summarize my opinion:

1) CAST vs. TRY_CAST

CAST is a SQL standard core operation with well-defined semantics across 
all major SQL vendors. There should be no discussion whether it returns 
NULL or an error. The semantics are already defined externally. I don't 
agree with "Streaming computing is a resident program ... users do not
want it to frequently fail", the same argument is also true for nightly 
batch jobs. A batch job can also get stuck through a SQL statement that 
is not lenient enough defined by the user.


An option that restores the old behavior and TRY_CAST for the future 
should solve this use case and make all parties happy.


2) TO_TIMESTAMP / TO_DATE

We should distinguish between CASTING and CONVERSION / PARSING. As a 
user, I would expect that parsing can fail and have to deal with this 
accordingly. Therefore, I'm fine with returning NULL in TO_ or CONVERT_ 
functions. This is also consistent with other vendors. Take PARSE of SQL 
Server as an example [1]: "If a parameter with a null value is passed at 
run time, then a null is returned, to avoid canceling the whole batch.". 
Here we can be more flexible with the semantics because users need to 
read the docs anyway.


3) Null at other locations

In general, we should stick to our data type constraints. Everything 
else will mess up the architecture of functions/connectors and their 
return types. Take the rowtime (event-time timestamp) attribute as an 
example: PRs like the one for FLINK-24885 are just the peak of the 
iceberg. If we would allow rowtime columns to be NULL we would need to 
check all time-based operators and implement additional handling logic 
for this.


It would be better to define unified error-handling for operators and 
maybe drop rows if the per-element processing failed. We should have a 
unified approach how to log/side output such records.


Until this is in place, I would suggest we spend some time in rules that 
can be enabled with an option for modifying the plan and wrap frequently 
failing expressions with a generic TRY() function. In this case, we 
don't need to deal with NULL in all built-in functions, we can throw 
helpful errors during development, and can return NULL even though the 
return type is NOT NULL. It would also make the NULL returning explicit 
in the plan.


Regards,
Timo





[1] 
https://docs.microsoft.com/en-us/sql/t-sql/functions/parse-transact-sql?view=sql-server-ver15

[2] https://issues.apache.org/jira/browse/FLINK-24885





On 18.11.21 11:34, Kurt Young wrote:

Sorry I forgot to add user ML. I also would like to gather some users
feedback on this thing.
Since I didn't get any feedback on this topic before from users.

Best,
Kurt


On Thu, Nov 18, 2021 at 6:33 PM Kurt Young  wrote:


(added user ML to this thread)

HI all,

I would like to raise a different opinion about this change. I agree with
Ingo that
we should not just break some existing behavior, and even if we introduce
an
option to control the behavior, i would propose to set the default value
to current
behavior.

I want to mention one angle to assess whether we should change it or not,
which
is "what could users benefit from the changes". To me, it looks like:

* new users: happy about the behavior
* existing users: suffer from the change, it either cause them to modify
the SQL or
got a call in late night reporting his online job got crashed and couldn't
be able to
restart.

I would like to quote another breaking change we did when we adjust the
time-related
function in FLIP-162 [1]. In that case, both new users and existing users
are suffered
from *incorrectly* implemented time function behavior, and we saw a lots
of feedbacks and
complains from various channels. After we fixed that, we never saw related
problems again.

Back to this topic, do we ever seen a user complain about current CAST
behavior? Form my
side, no.

To summarize:

+1 to introduce TRY_CAST to better prepare for the future.
-1 to modify the default behavior.
+0 to introduce a config option, but with the default value to existing
behavior. it's +0 because it
seems not necessary if i'm -1 to change the default behavior and also
don't see an urgent to modify.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior

Best,
Kurt


On Thu, Nov 18, 2021 at 4:26 PM Ingo Bürk  wrote:


Hi,

first of all, thanks for the summary of both sides, and for bringing up
the
discussion on this.
I think it is obvious that this is not something we can just "break", so
the config option seems mandatory to me.

Overall I agree with Martijn and Till that throwing errors is the more
expected behavior. I mostly think this is valuable 

[ANNOUNCE] New Apache Flink Committer - Jing Zhang

2021-11-15 Thread Timo Walther

Hi everyone,

On behalf of the PMC, I'm very happy to announce Jing Zhang as a new 
Flink committer.


Jing has been very active in the Flink community esp. in the Table/SQL 
area for quite some time: 81 PRs [1] in total and is also active on 
answering questions on the user mailing list. She is currently 
contributing a lot around the new windowing table-valued functions [2].


Please join me in congratulating Jing Zhang for becoming a Flink committer!

Thanks,
Timo

[1] https://github.com/apache/flink/pulls/beyond1920
[2] https://issues.apache.org/jira/browse/FLINK-23997


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-12 Thread Timo Walther

Hi everyone,

even though the DISCUSS thread was open for 2 weeks. I have the feeling 
that the VOTE was initiated to quickly. At least a final "I will start 
the vote soon. Last call for comments." would have been nice.


I also added some comments in the DISCUSS thread. Let's hope we can 
resolve those soon.


Regards,
Timo

On 12.11.21 16:36, Stephan Ewen wrote:

Hi all!

I have a few questions on the design still, posted those in the [DISCUSS]
thread.
It would be great to clarify those first before concluding this vote.

Thanks,
Stephan


On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:


+1 (binding)

Thanks for the great work Jingsong!

Best,
Jark

On Thu, 11 Nov 2021 at 19:41, JING ZHANG  wrote:


+1 (non-binding)

A small suggestion:
The message queue is currently used to store middle layer data of the
streaming data warehouse. We hope use built-in dynamic table storage to
store those middle layer.
But those middle data of the streaming data warehouse are often provided

to

all business teams in a company. Some teams have not use Apache Flink as
compute engine yet. In order to continue server those teams, the data in
built-in dynamic table storage may be needed to copied to message queue
again.
If *the built-in storage could provide same consumer API as the commonly
used message queues*, data copying may be avoided. So the built-in

dynamic

table storage may be promoted faster in the streaming data warehouse
business.

Best regards,
Jing Zhang

Yufei Zhang  于2021年11月11日周四 上午9:34写道:


Hi,

+1 (non-binding)

Very interesting design. I saw a lot of discussion on the generic
interface design, good to know it will address extensibility.

Cheers,
Yufei


On 2021/11/10 02:51:55 Jingsong Li wrote:

Hi everyone,

Thanks for all the feedback so far. Based on the discussion[1] we

seem

to have consensus, so I would like to start a vote on FLIP-188 for
which the FLIP has now also been updated[2].

The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless
there is an objection or insufficient votes.

[1] https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7
[2]





https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage


Best,
Jingsong













Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-12 Thread Timo Walther

Hi everyone,

sorry for the delay in joining this thread. I went through the FLIP and 
have some comments (maybe overlapping with Stephan's comments, which I 
haven't read yet):


a. > More importantly, in order to solve the cognitive bar...

It would be great if we can add not only `Receive any type of changelog` 
but also `Receive any type of datatype`.



b. > COMPACT [...] Compact table for high performance query. Launch a 
job to rewrite files.


Please clarify whether this is a synchronous or asynchrounous operation 
in the API? So far all DDL was synchrounous. And only DML asynchrounous.


c. > 'change-tracking' = 'false'

I find this option a bit confusing. Even in batch scenarios we have a 
changelog, only with insert-only changes. Can you elaborate? Wouldn't 
'exclude-from-log-store' or 'exclude-log-store' or 'log.disabled' be 
more accurate?


d. > DESCRIBE DETAIL TABLE

This seems very uncommon for SQL. How about `DESCRIBE TABLE EXTENDED`?

e. > Set checkpoint interval to 1 min if checkpoint is not enabled when 
the planner detects a sink to built-in dynamic table.


This sounds like too much magic to me. It will be super hard to debug 
why suddenly checkpointing is enabled. If a user has not configured the 
checkpointing yet, it could lead to unitended behavior without a proper 
checkpoint storage. It would be better to either throw an exception or 
just have weaker consistency guarantees in this case.


f. > GenericCatalog

Why do we need an additional marker interface without any methods in 
there? It only further complicates the catalog interfaces. Isn't a 
`Catalog#supportesTableStorage` enough? Also, we just introduced 
`CatalogBaseTable.TableKind` for exactly such new features. We can add a 
new table kind. A managed table can set this for a `CatalogTable`.


g. > enrichOptions(Context context)

Why is this method returning a Map? Shouldn't the caller 
assume that all options enriched via `CatalogTable.copy` should have 
been applied by `enrichOptions`?



h. Partitioning and Event-time:

Have you considered to support semantics similar to 
`sink.partition-commit.trigger` based on `partition-time`. It could 
beneficial to have the partitions committed by watermarks as well. My 
biggest concern is how we can enable watermarking end-to-end using a 
file store (I think for log store this should not be a problem?).


Looking forward to your feedback.

Regards,
Timo


On 12.11.21 16:35, Stephan Ewen wrote:

Hi all!

Thank you for the writeup of this feature. I like the general direction a
lot.

There are some open questions and confusing details still, which I think we
need to clarify first to make this feature really good.
Below are questions/suggestions on the FLIP:

Best,
Stephan

===

*(1) Log Implementation*

I agree with Eron that we should not design this hardwired to Kafka. Let's
have the internal interfaces in place to make this open to other streaming
storage systems as well.
The config options seem to be designed in a way that is Kafka-exclusive.
Can we change this, for example to something like
   - storage.log.system=kafka
   - storage.log.kafka.properties.bootstrap.servers
   - storage.log.kafka.retention

*(2) Change Tracking*

I am not sure I understand this fully. When a batch query inserts without
change tracking what happens then?
   - does it skip writing to the change log?
   - does it simply overwrite the managed table with the new result?
   - something different?

*(3) "table-storage.log.scan.startup.mode"*

Somehow the presence of this flag seems to break the abstraction of managed
tables.
Let's say someone creates a managed table that is computed via a query over
another managed table. It would need all the data from the previous table,
or it would be inconsistent.

What is the reason to have this setting? Support cases where one doesn't
need all past data (let's say only data from the previous month)? Exposing
this again somewhat destroys the nice "transparent out of the box"
behavior, because now users need to think again about the incremental
building of the tables. I think that case shows that we miss a bit better
handling of data retention (see next point).

Also, this seems to be a per-query setting, more than a global setting, so
should this be part of the config with which the query is submitted that
reads from the table-storage?

The names could also be improved a bit, I think, for example we could call
it just  "table-storage.log.scan" with values "full", "latest",
"from-timestamp".

*(4) Data retention*

I am wondering how and when data is ever cleaned up.
For example, when the table definition has a time attribute and predicate
so that the managed table should only contain the data from the previous
month. How does old data get cleaned up? Only through deletes coming from
timers in the Flink SQL layer?

I think if we want this to be really good and efficient, we need to look at
dropping data during the compaction. The compaction should know 

[jira] [Created] (FLINK-24877) Implicitly add time attributes for VALUES clause

2021-11-11 Thread Timo Walther (Jira)
Timo Walther created FLINK-24877:


 Summary: Implicitly add time attributes for VALUES clause
 Key: FLINK-24877
 URL: https://issues.apache.org/jira/browse/FLINK-24877
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Timo Walther


This might be a big change that we cannot do due to backwards compatibility. 
But I would like to at least log the idea and hear the opinions of others.

In theory, we could make all top-level timestamp columns of the {{VALUES}} 
clause time attributes. Since VALUES are always bounded, a watermark strategy 
is implicitly added with MAX WATERMARK at the end. This means that any 
timestamp can be used a time attribute.

Given the following example:
{code}
VALUES
 (1, 'name 1', TIMESTAMP '2020-03-08 13:12:11.123', 100, 41, 'payload 1'),
 (2, 'name 2', TIMESTAMP '2020-03-09 13:12:11.123', 101, 42, 'payload 2'),
 (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 43, 'payload 3'),
 (2, 'name 2', TIMESTAMP '2020-03-11 13:12:11.123', 101, 42, 'payload')
{code}

We could define windows or interval joins for testing or examples. It would 
make the starting experience nicer. With UNION ALL a user can have a minimum 
set of records in an otherwise streaming pipeline (after checkpointing finished 
tasks).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] FLIP-189: SQL Client Usability Improvements

2021-11-05 Thread Timo Walther

+1 (binding) thanks for working on this.

Regards,
Timo


On 05.11.21 10:14, Sergey Nuyanzin wrote:

Also there is a short demo showing some of the features mentioned in this
FLIP.
It is available at https://asciinema.org/a/446247?speed=3.0 (It was also
mentioned in [DISCUSS] thread)

On Wed, Nov 3, 2021 at 11:04 PM Sergey Nuyanzin  wrote:



Hi everyone,

I would like to start a vote on FLIP-189: SQL Client Usability
Improvements [1].
The FLIP was discussed in this thread [2].
FLIP-189 targets usability improvements of SQL Client such as parsing
improvement,
syntax highlighting, completion, prompts

The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-189%3A+SQL+Client+Usability+Improvements
[2] https://lists.apache.org/thread/8d580jcqzpcbmfwqvhjso82hdd2x0461

--
Best,
Sergey








[jira] [Created] (FLINK-24803) Fix cast BINARY/VARBINARY to STRING

2021-11-05 Thread Timo Walther (Jira)
Timo Walther created FLINK-24803:


 Summary: Fix cast BINARY/VARBINARY to STRING
 Key: FLINK-24803
 URL: https://issues.apache.org/jira/browse/FLINK-24803
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


BINARY/VARBINARY should be printed as regular arrays instead of interpreting 
them in an arbitrary character set as a string.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24802) Improve cast ROW to STRING

2021-11-05 Thread Timo Walther (Jira)
Timo Walther created FLINK-24802:


 Summary: Improve cast ROW to STRING
 Key: FLINK-24802
 URL: https://issues.apache.org/jira/browse/FLINK-24802
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


When casting ROW to string, we should have a space after the comma to be 
consistent with ARRAY, MAP, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-189: SQL Client Usability Improvements

2021-11-03 Thread Timo Walther

Hi Sergey,

thanks for your explanation.

Regarding keywords and other info: We should receive the information 
from the Flink SQL parser directly. We have added a couple of new 
keywords such as WATERMARK or MATCH_RECOGNIZE clauses. SQL92 would not 
help a user understand why a column name needs to be escaped. And in 
general, we should not have duplicate code. Let's discuss this when it 
comes to the implementation. I'm sure we can propagate the Calcite 
parser config into a nice POJO that the CLI can receive from the Executor.


Regards,
Timo


On 03.11.21 11:12, Sergey Nuyanzin wrote:

Hi 李宇彬,

I think you are right. Thank you very much for the idea.
I came across MySQL[1] and PostgreSQL[2] prompts and also
found several interesting features like control symbols to change style,
showing current property value and different datetime formats.

I have added your proposals and my findings to FLIP's page, please have a
look.

[1] https://dev.mysql.com/doc/refman/8.0/en/mysql-commands.html
[2] https://www.postgresql.org/docs/14/app-psql.html#APP-PSQL-PROMPTING

On Wed, Nov 3, 2021 at 2:31 AM 李宇彬  wrote:


Hi Sergey


It is a very useful improvement I'm looking forward to. in addition, I
think prompt
can play a greater role.


To help users call commands in expected context, we can get session
context
(current catalog/db/time) from cli prompt like MySQL,
please see details as below:


https://issues.apache.org/jira/browse/FLINK-24730




On 11/2/2021 21:09,Sergey Nuyanzin wrote:
Hi Timo

Thank you for your questions.

I will answer your questions here and update FLIP's page as well

For example, who is responsible for parsing comments? I guess the SQL
Client and not the Flink SQL parser will take care of this?
Yes, you are right. SQL Client is responsible for parsing here.
However it does not validate sql, it only validates that comments, brackets
and quotes are closed and the statement ends with a semicolon.
Also under the hood jline's splits input into words and works with them.
Within a custom parser it is possible to specify what should be considered
as a word or not considered at all (e.g. it is possible to remove
all line and block comments before submitting a query,
probably as another non default option)... During parsing it marks
what is a comment, a keyword, a quoted string etc. based on rules
defined in SQL Client parser.
SQL Client highlighter could use the result of this marking to highlight.
Completer could use it to do completion, e.g. if based on parser's marks
completer knows that a cursor is inside a comment or a string
then no need to complete anything.

Also, will the prompt hints for `'>` and ``>` support escaping? This can
be a tricky topic sometimes.
Ideally yes, I played with lots of tricky cases and it behaves ok.
At least I do not see limitations here.
In case you do please share...

In general, how do we deal with different SQL dialects in the SQL
Client. For example, it possible to `table.sql-dialect` to `HIVE`. Will
all highlighting, auto-complete and prompt hints be disabled in this case?
It could be turned off for the beginning.
To make it supported across different dialects it is required to have such
info:
1) Set of keywords
2) Quote sign
3) SQL identifier quote
4) Start of a line comment
5) Start and end of a block comment
6) Start and end of hints
I see at least 2 ways:
1. provide such api
2. create this mapping in SQL Client and use it based on current dialect
Then it will be easy to support it for a new dialect
Here the only questionable thing is keywords.
Currently I made it pretty straightforward:
if a word not inside quoted string, not inside a comment or a hint
and matches anything from
SQL92 (

*org.apache.calcite.sql.parser.SqlAbstractParserImpl#getSql92ReservedWords*)),

then it will be highlighted as a keyword.

On Tue, Nov 2, 2021 at 12:09 PM Timo Walther  wrote:

Hi Sergey,

thanks for this nice demo video. It looks very nice and makes the SQL
Client an even more useful tool.

What I miss a bit in the FLIP is the implementation details.

For example, who is responsible for parsing comments? I guess the SQL
Client and not the Flink SQL parser will take care of this?

Also, will the prompt hints for `'>` and ``>` support escaping? This can
be a tricky topic sometimes.

In general, how do we deal with different SQL dialects in the SQL
Client. For example, it possible to `table.sql-dialect` to `HIVE`. Will
all highlighting, auto-complete and prompt hints be disabled in this case?


Looking forward to have this in Flink.

Thanks,
Timo



On 02.11.21 08:26, Till Rohrmann wrote:
Hi Sergey,

I think that after answering/resolving Jark's comments, you can start a
[VOTE] thread for this FLIP. The process is described here [1]. Once the
FLIP has been accepted, which it should, given the positive feedback, you
can start working on it by creating the corresponding JIRA tickets and
then
start coding. I hope that there will be a committer familiar with the SQL
cli

Re: [DISCUSS] FLIP-189: SQL Client Usability Improvements

2021-11-02 Thread Timo Walther

Hi Sergey,

thanks for this nice demo video. It looks very nice and makes the SQL 
Client an even more useful tool.


What I miss a bit in the FLIP is the implementation details.

For example, who is responsible for parsing comments? I guess the SQL 
Client and not the Flink SQL parser will take care of this?


Also, will the prompt hints for `'>` and ``>` support escaping? This can 
be a tricky topic sometimes.


In general, how do we deal with different SQL dialects in the SQL 
Client. For example, it possible to `table.sql-dialect` to `HIVE`. Will 
all highlighting, auto-complete and prompt hints be disabled in this case?



Looking forward to have this in Flink.

Thanks,
Timo



On 02.11.21 08:26, Till Rohrmann wrote:

Hi Sergey,

I think that after answering/resolving Jark's comments, you can start a
[VOTE] thread for this FLIP. The process is described here [1]. Once the
FLIP has been accepted, which it should, given the positive feedback, you
can start working on it by creating the corresponding JIRA tickets and then
start coding. I hope that there will be a committer familiar with the SQL
client that can help you with the code review and merging of the code. But
again, given the positive feedback, I think there will be a volunteer.

[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

Cheers,
Till

On Tue, Nov 2, 2021 at 4:25 AM Jark Wu  wrote:


Awesome demo, looking forward to these features!

I only have a minor comment: could we provide a config to enable/disable
the prompt values?
We can also discuss whether we can enable all the new features by default
to give them more exposure.

Best,
Jark

On Tue, 2 Nov 2021 at 10:48, JING ZHANG  wrote:


Amazing improvements and impressive video.
Big +1.

Best,
JING ZHANG

Kurt Young  于2021年11月2日周二 上午9:37写道:


Really cool improvements @Sergey. Can't wait to see it happen.

Best,
Kurt


On Tue, Nov 2, 2021 at 1:56 AM Martijn Visser 
wrote:


Hi Sergey,

I guess you've just set a new standard ;-) I agree with Ingo, these
improvements look really good!

Best regards,

Martijn

On Mon, 1 Nov 2021 at 18:23, Ingo Bürk  wrote:


Hi Sergey,

I think those improvements look absolutely amazing. Thanks for the

little

video!


Best
Ingo

On Mon, Nov 1, 2021, 17:15 Sergey Nuyanzin 

wrote:



Thanks for the feedback Till.

Martijn, I have created a short demo showing some of the features

mentioned

in FLIP.
It is available at https://asciinema.org/a/446247?speed=3.0
Could you please tell if it is what you are expecting or not?

On Fri, Oct 29, 2021 at 4:59 PM Till Rohrmann <

trohrm...@apache.org>

wrote:


Thanks for creating this FLIP Sergey. I think what you propose

sounds

like

very good improvements for the SQL client. This should make the

client

a

lot more ergonomic :-)

Cheers,
Till

On Fri, Oct 29, 2021 at 11:26 AM Sergey Nuyanzin <

snuyan...@gmail.com>

wrote:


Hi Martijn,

Thank you for your suggestion with POC.
Yes I will do that and come back to this thread probably

after

the

weekend


On Thu, Oct 28, 2021 at 4:38 PM Martijn Visser <

mart...@ververica.com>

wrote:


Hi Sergey,

Thanks for taking the initiative to create a FLIP and

propose

improvements

on the SQL client. All usability improvements on the SQL

client

are

highly

appreciated, especially for new users of Flink. Multi-line

support

is

definitely one of those things I've run into myself.

I do think it would be quite nice if there would be some

kind

of

POC

which

could show (some of) the proposed improvements. Is that

something

that

might be easily feasible?

Best regards,

Martijn

On Thu, 28 Oct 2021 at 11:02, Sergey Nuyanzin <

snuyan...@gmail.com



wrote:



Hi all,

I want to start a discussion about FLIP-189: SQL Client

Usability

Improvements.

The main changes in this FLIP:

- Flink sql client parser improvements so
that sql client does not ask for ; inside a quoted

string

or a

comment

- use prompt to show what sql client is waiting for
- introduce syntax highlighting
- improve completion

For more detailed changes, please refer to FLIP-189[1].

[1]



















https://cwiki.apache.org/confluence/display/FLINK/FLIP-189%3A+SQL+Client+Usability+Improvements




Look forward to your feedback.

--
Best regards,
Sergey






--
Best regards,
Sergey






--
Best regards,
Sergey

















[jira] [Created] (FLINK-24717) Push partitions before filters

2021-11-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-24717:


 Summary: Push partitions before filters
 Key: FLINK-24717
 URL: https://issues.apache.org/jira/browse/FLINK-24717
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Currently, we push filters before partitions. This means that a 
{{applyFilters}} needs to have partition logic to extract the partition 
predicate. Furthermore, if a {{applyFilters}} consumes all filters (no 
remaining predicates), the {{applyPartitions}} is never called.

We should execute the {{PushPartitionIntoTableSourceScanRule}} first and check 
for side effects of this change.

See 
{{org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRuleTest#testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected}}
 for an example of using the new test infrastructure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24716) Non-equality predicates on partition columns lead to incorrect plans

2021-11-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-24716:


 Summary: Non-equality predicates on partition columns lead to 
incorrect plans
 Key: FLINK-24716
 URL: https://issues.apache.org/jira/browse/FLINK-24716
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Queries such as
{code}
SELECT d FROM T1 WHERE c = 100 AND d > '2012'
{code}

where {{d}} is a partition column leads to incorrect plans:

{code}
== Abstract Syntax Tree ==
LogicalProject(d=[$2])
+- LogicalFilter(condition=[AND(=($0, 100), >($2, _UTF-16LE'2012'))])
   +- LogicalTableScan(table=[[default_catalog, default_database, T1]])

== Optimized Physical Plan ==
Calc(select=[d], where=[=(c, 100)])
+- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], 
partitions=[], project=[c, d]]], fields=[c, d])

== Optimized Execution Plan ==
Calc(select=[d], where=[(c = 100)])
+- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], 
partitions=[], project=[c, d]]], fields=[c, d])
{code}

It seems in many cases (with SupportsFilterPushDown and without) the {{<}} 
predicate is swallowed and not part of the final execution plan anymore.

Reproducible code can be found 
[here|https://github.com/twalthr/flink/blob/e5a2cc9bcc9b38cf2b94c9ea7c7296ce94434343/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/TestClass.java]
 with new testing infrastructure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Over Aggregation add supports CURRENT_TIMESTAMP as the upper boundary in RANGE intervals .

2021-11-01 Thread Timo Walther

Hi,

this is an interesting idea. But as far as I can see, by looking at 
other SQL engines like Microsoft SQL Server:


https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15

The range is a well-defined set of keywords and CURRENT_TIMESTAMP is not 
listed there.


Regards,
Timo


On 01.11.21 08:22, 林挺滨 wrote:

Can anyone give me some advice or information about this feature?

林挺滨  于2021年10月14日周四 下午1:33写道:


In our scenario, it is often necessary to calculate the user's aggregated
indicators in the most recent period of time.
For example, if I need to calculate the user's recharge amount in the most
recent day, I can do it through the following SQL code.
-
CREATE TEMPORARY VIEW purchase as
select user_id, purchase_price, __ts__
from
raw_purchase;

CREATE TEMPORARY VIEW purchase_expire as
select user_id, 0 as purchase_price,
SESSION_ROWTIME (__ts__, INTERVAL '1' DAY + INTERVAL '1' SECOND) as __ts__
from
purchase as T
GROUP BY SESSION (T.__ts__, INTERVAL '1' DAY + INTERVAL '1' SECOND),
user_id, __ts__;

CREATE TEMPORARY VIEW total_purchase as
select * from purchase
union all
select * from purchase_expire;

select user_id, SUM(purchase_price) OVER (
PARTITION BY user_id
ORDER BY __ts__
RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW
)
from total_purchase;
-

If the "Over Aggregation CURRENT_TIMESTAMP" is supported, the above code
can be replaced by the following simple code,and the simple code is easier
to understand.
-
select user_id, SUM(purchase_price) OVER (
PARTITION BY user_id
ORDER BY __ts__
RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT_TIMESTAMP
)
from raw_purchase;
-

I have seen the implementation of the RowTimeRangeBoundedPrecedingFunction
class. It is very simple to add support for CURRENT_TIMESTAMP.








[jira] [Created] (FLINK-24714) Validate partition columns for ResolvedCatalogTable

2021-11-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-24714:


 Summary: Validate partition columns for ResolvedCatalogTable
 Key: FLINK-24714
 URL: https://issues.apache.org/jira/browse/FLINK-24714
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Currently, partition columns are not validated and might not exist in the 
schema.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24700) Clarify semantics of filter, projection, partition, and metadata pushdown

2021-10-29 Thread Timo Walther (Jira)
Timo Walther created FLINK-24700:


 Summary: Clarify semantics of filter, projection, partition, and 
metadata pushdown
 Key: FLINK-24700
 URL: https://issues.apache.org/jira/browse/FLINK-24700
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-24165 has revealed a couple of shortcomings that occur when implementing 
multiple ability interfaces. We should improve the documentation and better 
define the semantics.

- Push produced type not only for metadata pushdown but also projection push 
down.
- Clarify order of filter + projection
- Clarify order of projection/filter + partition
- Simplify handling of partition columns
...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Shading in flink-table-blink and upgrade compatibility issue

2021-10-26 Thread Timo Walther

Hi Thomas,

thanks for your feedback.

The error that you are experiencing is definitely a bug in 1.13.3 and 
the missing method should be reintroduced in the next patch version to 
make code compiled against older patch versions run again.


Regarding the discussion points:

I agree that flink-table-blink uber jar should not contain a dependency 
to flink-connector-base. Even filesystem connectors should be optional 
and put in a dedicated module that is not in /lib by default.


With having the flink-table-blink uber jar in /lib we would like to 
improve the SQL experience as this API is as important as the DataStream 
API nowadays. But the depenencies should be minimal nevertheless.


Regards,
Timo


On 23.10.21 00:59, Thomas Weise wrote:

Hi,

As part of upgrading to Flink 1.13.3 from 1.13.2 we run into the
following problem with KafkaSource (Flink distribution is 1.13.2 and
the application was built with 1.13.3):

java.lang.NoSuchMethodError: 'void
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
java.util.function.Supplier, java.util.function.Consumer)'
at 
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager.(KafkaSourceFetcherManager.java:67)
at 
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:160)
at 
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:127)

It turns out that flink-table-blink_2.12-1.13.2.jar contains
flink-connector-base and because that jar is under lib the 1.13.2
connector base gets picked up instead of the one bundled in the
application jar.

(The constructor in
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager
was added in 1.13.3.)

There are a few points I would like to discuss:

1) Version compatibility: A *patch* version should ideally not
introduce such a change, it should be forward and backward compatible.
Hopefully this will be the case after 1.14 with stable source API.
2) flink-table-blink - if it is meant to be self contained and usable
as a library - should not leak its shaded dependencies. It contains
FileSource and other deps from Flink, can those be relocated?
3) Do we need flink-table-blink under lib? Can it be bundled with the
application instead? It would be great if the dependencies under lib
are strictly Flink core.

Thanks,
Thomas





Re: [DISCUSS] Should we drop Row SerializationSchema/DeserializationSchema?

2021-10-21 Thread Timo Walther

Hi Francesco,

thanks for starting this discussion. It is definitely time to clean up 
more connectors and formats that were used for the old planner but are 
actually not intended for the DataStream API.


+1 for deprecating and dropping the mentioned formats. Users can either 
use Table API or implement a custom 
SerializationSchema/DeserializationSchema according to their needs. It 
is actually not that complicated to add Jackson and configure the 
ObjectMapper for reading JSON/CSV.


Regards,
Timo


On 18.10.21 17:42, Francesco Guardiani wrote:

Hi all,
In flink-avro, flink-csv and flink-json we have implementations of
SerializationSchema/DeserializationSchema for the org.apache.flink.types.Row
type. In particular, I'm referring to:

- org.apache.flink.formats.json.JsonRowSerializationSchema
- org.apache.flink.formats.json.JsonRowDeserializationSchema
- org.apache.flink.formats.avro.AvroRowSerializationSchema
- org.apache.flink.formats.avro.AvroRowDeserializationSchema
- org.apache.flink.formats.csv.CsvRowDeserializationSchema
- org.apache.flink.formats.csv.CsvRowSerializationSchema

These classes were used in the old table planner, but now the table planner
doesn't use the Row type internally anymore, so these classes are unused
from the flink-table packages.

Because these classes are exposed (some have @PublicEvolving annotation)
there might be some users out there using them when using the DataStream
APIs, for example to convert an input stream of JSON from Kafka to a Row
instance.

Do you have any opinions about deprecating these classes in 1.15 and then
drop them in 1.16? Or are you using them? If yes, can you describe your use
case?

Thank you,

FG





[jira] [Created] (FLINK-24599) Make checking for type root and family less verbose

2021-10-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-24599:


 Summary: Make checking for type root and family less verbose
 Key: FLINK-24599
 URL: https://issues.apache.org/jira/browse/FLINK-24599
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


Currently, we use `LogicalTypeChecks.hasRoot()` and 
`LogicalTypeChecks.hasFamily()` for frequent checking of logical types. It was 
a conscious decision to not overload `LogicalType` with utility methods in the 
beginning. But the two mentioned methods would be nice to have available in 
`LogicalType` directly.

We suggest:
{code}
LogicalType#is(LogicalTypeRoot)
LogicalType#is(LogicalTypeFamily)
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24532) Invalid deletions for time-versioned join with ChangelogNormalize

2021-10-13 Thread Timo Walther (Jira)
Timo Walther created FLINK-24532:


 Summary: Invalid deletions for time-versioned join with 
ChangelogNormalize
 Key: FLINK-24532
 URL: https://issues.apache.org/jira/browse/FLINK-24532
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Timo Walther


ChangelogNormalize replaces a +D row with a -U row derived from the previous 
inserted row. This means that the rowtime column is reset to the insertion 
time. In this case, time-version joins produce an invalid result as a deletion 
is applied immediately instead of the original deletion timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Releasing Flink 1.13.3

2021-09-30 Thread Timo Walther

Thanks Chesnay and Leonard for helping here.

From the SQL primary key issue side, all backports are merged. Feel 
free to proceed with the release.


Regards,
Timo


On 30.09.21 11:29, Chesnay Schepler wrote:

I can help Leonard with the PMC bits for the 1.13.3 release.

On 27/09/2021 21:26, Martijn Visser wrote:

Hi Timo,

Sounds good to me.

However, I still need a PMC to make to the release. I definitely 
volunteer

to help out with keeping track of things, communication etc. Hopefully
there's a PMC who can help.

Best regards,

Martijn

Op ma 27 sep. 2021 om 11:32 schreef Timo Walther 


Hi Martijn,

we are currently in the process of backporting a couple of PRs to the
1.13 branch to fix the partially broken primary key support in Flink
SQL. See FLINK-20374 for more information.

I hope we can finalize this in one or two days. It should be completed
in 1.13.3 to avoid confusion for backported commits that would spread
multiple releases otherwise.

Thanks,
Timo

On 24.09.21 07:44, Yun Tang wrote:

Hi Martijn,

Thanks for your reminder of FLINK-23519[1], this ticket has been merged
in release-1.13 before but not yet merged into release-1.14 as we 
have not

release Flink-1.14.0 officially.

That's why the ticket is still in status IN-PROGRESS, and I think this

would not block the release of flink-1.13.3.


[1] https://issues.apache.org/jira/browse/FLINK-23519

Best
Yun Tang

From: Leonard Xu 
Sent: Friday, September 24, 2021 10:22
To: +dev 
Subject: Re: [DISCUSS] Releasing Flink 1.13.3



My conclusion would be that there are no tickets open right now which

would

block a 1.13.3 release. I only need a PMC member who would like to

manage

the release. Anyone?

Hello, Matijn

I am willing to help release 1.13.3 if possible, but maybe I need some

assistance from PMC member.

Best,
Leonard




Best regards,

Martijn

On Thu, 23 Sept 2021 at 15:29, Yang Wang  
wrote:



FYI: I have merged FLINK-24315[1].

[1]. https://issues.apache.org/jira/browse/FLINK-24315

Best,
Yang

Yangze Guo  于2021年9月23日周四 上午11:17写道:


Thanks for driving this discussion, Martijn. +1 for releasing Flink

1.13.3.

In addition to the issues already listed,
https://issues.apache.org/jira/browse/FLINK-24005 is also an

important

fix.

Regarding FLINK-24315, @Yang Wang will help with the review. It is
likely to be merged this week, if not, I don't think it should block
the release.

Best,
Yangze Guo


On Wed, Sep 22, 2021 at 8:44 PM Konstantin Knauf 
wrote:

Hi Martijn,

Thanks for starting the discussion. +1 for a soon Flink 1.13.3. 
IMHO

we

don't need to block the release on FLINK-23519 (an improvement),
FLINK-21853 (seems like a Minor issue with a test) or FLINK-23946

(depends

on https://issues.apache.org/jira/browse/FLINK-23946, which is not

started

yet). This would leave FLINK-24315, which I can't judge at all.

Cheers,

Konstantin

On Wed, Sep 22, 2021 at 1:34 PM Martijn Visser <

mart...@ververica.com>

wrote:


Hi all,

I would like to start discussing releasing Flink 1.13.3. There are
currently 138 tickets already resolved for 1.13.3, a few important

fixes

are:

* https://issues.apache.org/jira/browse/FLINK-24347 (KafkaSource

cannot

checkpoint if the parallelism is higher than the partition number)
* https://issues.apache.org/jira/browse/FLINK-24303

(SourceCoordinator

exception may fail Session Cluster)
* https://issues.apache.org/jira/browse/FLINK-24277 (Offset commit

should

be disabled if consumer group ID is not specified in KafkaSource)
* https://issues.apache.org/jira/browse/FLINK-23802 (Reduce
ReadTimeoutExceptions for Kinesis Consumer)

There are currently 4 tickets with fix version 1.13.3 that are in

progress:
* https://issues.apache.org/jira/browse/FLINK-24315 (Cannot 
rebuild

watcher
thread while the K8S API server is unavailable)
* https://issues.apache.org/jira/browse/FLINK-23946 (Application

mode

fails
fatally when being shut down)
* https://issues.apache.org/jira/browse/FLINK-23519 (Aggregate

State

Backend Latency by State Level)
* https://issues.apache.org/jira/browse/FLINK-21853 (Running HA

per-job
cluster (rocks, non-incremental) end-to-end test could not 
finished

in

900

seconds)

Can Yangze, David, Yun and Till give an update on the status for

those?
Are there any other open tickets that we should wait for? Is 
there a

PMC

member who would like to manage the release?

Best regards,

Martijn Visser | Product Manager

mart...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time



--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk



--

Martijn Visser | Product Manager

mart...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time







[jira] [Created] (FLINK-24427) Hide Scala in flink-table-planner from API

2021-09-30 Thread Timo Walther (Jira)
Timo Walther created FLINK-24427:


 Summary: Hide Scala in flink-table-planner from API
 Key: FLINK-24427
 URL: https://issues.apache.org/jira/browse/FLINK-24427
 Project: Flink
  Issue Type: Sub-task
  Components: API / Scala, Table SQL / API, Table SQL / Planner
Reporter: Timo Walther


FLIP-32 has decoupled the planner from the API. However, the planner code base 
is still the largest Scala code base we have in Flink that we cannot port to 
Java easily.

In order to allow arbitrary Scala versions in the user API, we suggest to hide 
the Scala version of the planner from the Scala version of the API. The API is 
mostly developed in Java and contains only a couple of classes implemented in 
Scala. Those should be easier to maintain for various Scala versions. The 
planner is already discovered via Java SPI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24399) Make handling of DataType less verbose

2021-09-28 Thread Timo Walther (Jira)
Timo Walther created FLINK-24399:


 Summary: Make handling of DataType less verbose
 Key: FLINK-24399
 URL: https://issues.apache.org/jira/browse/FLINK-24399
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Timo Walther


{{DataType}} is the API abstraction for declaring types and always includes the 
{{LogicalType}} as well. In order to ease the handling of this class, we should 
do the following additions:

- Add a {{DynamicTableFactory.Context#getPhysicalRowDataType(): DataType}}
- Add a {{DynamicTableFactory.Context#getPrimaryKey: 
Optional}}
- Add a {{DataType#getFieldNames: List}} (empty for atomics)
- Add a {{DataType#getFieldDataTypes: List}} as a synonym for 
{{getChildren}} (empty for atomics)
- Add a {{DataType#getFieldCount: int}} (0 for atomics)

This should simplify implementations and avoid needs for internal utilities.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: The Apache Flink should pay more attention to ensuring API compatibility.

2021-09-28 Thread Timo Walther
I opened https://issues.apache.org/jira/browse/FLINK-24396 to track this 
effort.


Not sure if this will happen in 1.15 already. We will needed automated 
compatibility tests and a well-defined list of stable API.


We can also do this incrementally and start with the interfaces for 
connectors.


Regards,
Timo


On 28.09.21 11:47, Leonard Xu wrote:

Thanks @peninx for the feedback, this will definitely help the flink community.

Recently, we also developed a series of connectors in Flink CDC project[1]. 
They are based on flink version 1.13.1, but many users still use flink version 
1.12.* in production. They have encountered similar problems, and it is 
difficult to upgrade the flink cluster version within their company.

Therefore, things like directly changing CatalogTable to ResolvedCatalogTable 
should not happen, we should have marked it as @Deprecated and keep at least 
one version for compatibility.

In one word, it's valuable feedback that we will pay more attention to API 
compatibility.

Best,
Leonard

[1] https://github.com/ververica/flink-cdc-connectors


在 2021年9月28日,17:16,Jeff Zhang  写道:

I believe I mentioned this before in the community, we (Zeppelin) use flink
api as well and would like to support multiple versions of flink in one
zeppelin version. For now we have to use reflection to achieve that.

https://github.com/apache/zeppelin/tree/master/flink


OpenInx  于2021年9月28日周二 下午5:10写道:


Thanks for the information, Martijin & Timo !


Since implementing a connector is not straightforward, we were expecting

that not many users implement custom connectors.

Currently, the apache iceberg & hudi are heavily depending on the
PublicEvolving API for their flink connectors.  I think apache hudi even
uses more public API than iceberg to implement their relatively complicated
flink sink DAG, I think Danny Chen [1] may want to provide more input.  API
compatibility has become one of the core reasons that downstream projects
maintainers vote to support a release or not because bandwidth from the
downstream projects are limited and we maintainers need to balance between
the community requirements and cost.  A great compatible flink release will
greatly save the maintenance cost (especially we flink release often ) and
we are also glad to make it a longer life cycle.


We therefore consider this part as a kind of "second level API" for which

we can evolve quicker.

That sounds great ! I'm glad to see that we are making the API more
friendly !

[1]. https://github.com/danny0405



On Tue, Sep 28, 2021 at 3:52 PM Timo Walther  wrote:


Hi Zheng,

I'm very sorry for the inconvenience that we have caused with our API
changes. We are trying our best to avoid API breaking changes. Thanks
for giving us feedback.

There has been a reason why Table API was marked as @PublicEvolving
instead of @Public. Over the last two years, we have basically rewritten
the entire API [1] to digest the Blink merge and making the Table API
stable and ready for the future. We tried our best to give users 1-2
releases time to upgrade their implementations whenever we deprecated
API but we were aware that this might cause frustration, but hopefully
for the greater good. We have reworked type system, Catalog API, schema,
source/sinks, functions and much more. Flink 1.14 will hopefully be the
last release with major API changes. We could also mark most Table API
interfaces as `@Public` in 1.15.

For your mentioned incompatibility, I agree that the change from
CatalogTable to ResolvedCatalogTable was not very nice. Since
implementing a connector is not straight forward, we were expecting that
not many users implement custom connectors. We therefore consider this
part as kind of "second level API" for which we can evolve quicker. A
`context.getCatalogTable().getSchema()` should still work for 1.12 and
1.13, at least that was the goal.

Thanks again for the feedback. It was a good reminder and we will pay
more attention to this.

Regards,
Timo

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions



On 28.09.21 08:40, Martijn Visser wrote:

Hi Zheng,

Thanks for reaching out and sharing your frustration. No feelings are

hurt

and feedback is always welcome, because that's the only way we can

improve

for the future. API compatibility is a really important thing for us

while

also improving and building new capabilities. Let me investigate a bit

what

happened on our end, share that and then try to get some learnings out

of

it for the future. I'll get back to you in a couple of days.

Best regards,

Martijn Visser | Product Manager

mart...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


On Tue, 28 Sept 2021 at 07:39, OpenInx  wrote:


Sorry about my unfriendly tone of 

[jira] [Created] (FLINK-24396) Add @Public annotations to Table & SQL API

2021-09-28 Thread Timo Walther (Jira)
Timo Walther created FLINK-24396:


 Summary: Add @Public annotations to Table & SQL API
 Key: FLINK-24396
 URL: https://issues.apache.org/jira/browse/FLINK-24396
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Timo Walther


Many parts of the Table & SQL API have stabilized and we can mark them as 
{{@Public}} which gives both users and downstream projects more confidence when 
using Flink.

A concrete list of classes and methods needs to be compiled. Some parts of the 
API might stay {{@PublicEvolving}} for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: The Apache Flink should pay more attention to ensuring API compatibility.

2021-09-28 Thread Timo Walther

Hi Zheng,

I'm very sorry for the inconvenience that we have caused with our API 
changes. We are trying our best to avoid API breaking changes. Thanks 
for giving us feedback.


There has been a reason why Table API was marked as @PublicEvolving 
instead of @Public. Over the last two years, we have basically rewritten 
the entire API [1] to digest the Blink merge and making the Table API 
stable and ready for the future. We tried our best to give users 1-2 
releases time to upgrade their implementations whenever we deprecated 
API but we were aware that this might cause frustration, but hopefully 
for the greater good. We have reworked type system, Catalog API, schema, 
source/sinks, functions and much more. Flink 1.14 will hopefully be the 
last release with major API changes. We could also mark most Table API 
interfaces as `@Public` in 1.15.


For your mentioned incompatibility, I agree that the change from 
CatalogTable to ResolvedCatalogTable was not very nice. Since 
implementing a connector is not straight forward, we were expecting that 
not many users implement custom connectors. We therefore consider this 
part as kind of "second level API" for which we can evolve quicker. A 
`context.getCatalogTable().getSchema()` should still work for 1.12 and 
1.13, at least that was the goal.


Thanks again for the feedback. It was a good reminder and we will pay 
more attention to this.


Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions



On 28.09.21 08:40, Martijn Visser wrote:

Hi Zheng,

Thanks for reaching out and sharing your frustration. No feelings are hurt
and feedback is always welcome, because that's the only way we can improve
for the future. API compatibility is a really important thing for us while
also improving and building new capabilities. Let me investigate a bit what
happened on our end, share that and then try to get some learnings out of
it for the future. I'll get back to you in a couple of days.

Best regards,

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


On Tue, 28 Sept 2021 at 07:39, OpenInx  wrote:


Sorry about my unfriendly tone of the last e-mail, I got frustrated about
the experience of maintaining the project which is closely with Flink. My
intention was trying to remind everyone to be careful about API
compatibility and didn't really watch out for the tone I used.

Hope that doesn't hurt anyone's feelings.

On Tue, Sep 28, 2021 at 12:33 PM OpenInx  wrote:


Hi Dev

We are trying to upgrade the flink version from 1.12.0 to 1.13.2 in

apache

iceberg project ( https://github.com/apache/iceberg/pull/3116),  but

it's

not a great experience.  We expect to support both flink1.12 and

flink1.13

in an iceberg-flink module without using the new API of flink1.13 for
saving maintenance cost,  but we find the iceberg-flink-runtime.jar built
by flink 1.13 cannot works fine in flink 1.12 clusters because of the

basic

API compatibility was break when iterating flink 1.12 to flink1.13.2:

(The following are copied from the iceberg issue:
https://github.com/apache/iceberg/issues/3187#issuecomment-928755046)

Thanks for the report, @Reo-LEI ! I think this issue was introduced from
this apache flink PR (


https://github.com/apache/flink/pull/15316/files#diff-bd276ed951054125b39428ee61de103d9c7832246398f01514a574bb8e51757cR74
)

and FLINK-21913 (https://issues.apache.org/jira/browse/FLINK-21913), it
just changed the returned data type from CatalogTable to
ResolvedCatalogTable without any compatibility guarantee. In this case,

the

iceberg-flink-runtime jar which is compiled from apache flink 1.13 will
include the ResovledCatalogTable class inside it. Finally when we package
this jar and submit the flink job to flink 1.12, the above compatibility
issue happen.

As we all know, the DynamicTableFactory (


https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
)

is a basic API which almost all flink connectors are built on top of it.
The breaking compatibility makes the downstream projects really hard to
deliver better compatibility to users, unless we iceberg maintain

different

modules for each maintained flink version (That's not the thing that we
want to do).

The last flink upgrading work is also not a good experience (See the
discussion (https://github.com/apache/iceberg/pull/1956) and comment (
https://github.com/apache/iceberg/pull/1956#discussion_r546534299) ),
because the flink 1.12 also breaks several API that was annotated
PublicEvolving in flink 1.11.0, that becomes one of the most important
reasons leading to the conclusion that stops support flink 1.11.0 in our
apache 

Re: [DISCUSS] Releasing Flink 1.13.3

2021-09-27 Thread Timo Walther

Hi Martijn,

we are currently in the process of backporting a couple of PRs to the  
1.13 branch to fix the partially broken primary key support in Flink  
SQL. See FLINK-20374 for more information.


I hope we can finalize this in one or two days. It should be completed  
in 1.13.3 to avoid confusion for backported commits that would spread  
multiple releases otherwise.


Thanks,
Timo

On 24.09.21 07:44, Yun Tang wrote:

Hi Martijn,

Thanks for your reminder of FLINK-23519[1], this ticket has been merged in 
release-1.13 before but not yet merged into release-1.14 as we have not release 
Flink-1.14.0 officially.
That's why the ticket is still in status IN-PROGRESS, and I think this would 
not block the release of flink-1.13.3.


[1] https://issues.apache.org/jira/browse/FLINK-23519

Best
Yun Tang

From: Leonard Xu 
Sent: Friday, September 24, 2021 10:22
To: +dev 
Subject: Re: [DISCUSS] Releasing Flink 1.13.3



My conclusion would be that there are no tickets open right now which would
block a 1.13.3 release. I only need a PMC member who would like to manage
the release. Anyone?


Hello, Matijn

I am willing to help release 1.13.3 if possible, but maybe I need some 
assistance from PMC member.

Best,
Leonard





Best regards,

Martijn

On Thu, 23 Sept 2021 at 15:29, Yang Wang  wrote:


FYI: I have merged FLINK-24315[1].

[1]. https://issues.apache.org/jira/browse/FLINK-24315

Best,
Yang

Yangze Guo  于2021年9月23日周四 上午11:17写道:


Thanks for driving this discussion, Martijn. +1 for releasing Flink

1.13.3.

In addition to the issues already listed,
https://issues.apache.org/jira/browse/FLINK-24005 is also an important
fix.

Regarding FLINK-24315, @Yang Wang will help with the review. It is
likely to be merged this week, if not, I don't think it should block
the release.

Best,
Yangze Guo


On Wed, Sep 22, 2021 at 8:44 PM Konstantin Knauf 
wrote:


Hi Martijn,

Thanks for starting the discussion. +1 for a soon Flink 1.13.3. IMHO we
don't need to block the release on FLINK-23519 (an improvement),
FLINK-21853 (seems like a Minor issue with a test) or FLINK-23946

(depends

on https://issues.apache.org/jira/browse/FLINK-23946, which is not

started

yet). This would leave FLINK-24315, which I can't judge at all.

Cheers,

Konstantin

On Wed, Sep 22, 2021 at 1:34 PM Martijn Visser 
wrote:


Hi all,

I would like to start discussing releasing Flink 1.13.3. There are
currently 138 tickets already resolved for 1.13.3, a few important

fixes

are:

* https://issues.apache.org/jira/browse/FLINK-24347 (KafkaSource

cannot

checkpoint if the parallelism is higher than the partition number)
* https://issues.apache.org/jira/browse/FLINK-24303

(SourceCoordinator

exception may fail Session Cluster)
* https://issues.apache.org/jira/browse/FLINK-24277 (Offset commit

should

be disabled if consumer group ID is not specified in KafkaSource)
* https://issues.apache.org/jira/browse/FLINK-23802 (Reduce
ReadTimeoutExceptions for Kinesis Consumer)

There are currently 4 tickets with fix version 1.13.3 that are in

progress:


* https://issues.apache.org/jira/browse/FLINK-24315 (Cannot rebuild
watcher
thread while the K8S API server is unavailable)
* https://issues.apache.org/jira/browse/FLINK-23946 (Application

mode

fails
fatally when being shut down)
* https://issues.apache.org/jira/browse/FLINK-23519 (Aggregate State
Backend Latency by State Level)
* https://issues.apache.org/jira/browse/FLINK-21853 (Running HA

per-job

cluster (rocks, non-incremental) end-to-end test could not finished

in

900

seconds)

Can Yangze, David, Yun and Till give an update on the status for

those?


Are there any other open tickets that we should wait for? Is there a

PMC

member who would like to manage the release?

Best regards,

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time




--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk











[jira] [Created] (FLINK-24254) Support hints configuration hints

2021-09-10 Thread Timo Walther (Jira)
Timo Walther created FLINK-24254:


 Summary: Support hints configuration hints 
 Key: FLINK-24254
 URL: https://issues.apache.org/jira/browse/FLINK-24254
 Project: Flink
  Issue Type: Bug
Reporter: Timo Walther






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24186) Disable single rowtime column check for collect/print

2021-09-07 Thread Timo Walther (Jira)
Timo Walther created FLINK-24186:


 Summary: Disable single rowtime column check for collect/print
 Key: FLINK-24186
 URL: https://issues.apache.org/jira/browse/FLINK-24186
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


As seen in FLINK-23751, the single rowtime column check can occur also during 
collecting and printing which is not important there as watermarks as not used.

The exception is also misleading as it references a {{DataStream}}:
{code:java}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Found more than one rowtime field: 
[bidtime, window_time] in the query when insert into 
'default_catalog.default_database.Unregistered_Collect_Sink_8'.
Please select the rowtime field that should be used as event-time timestamp for 
the DataStream by casting all other fields to TIMESTAMP.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Automated architectural tests

2021-09-02 Thread Timo Walther

Hi Ingo,

thanks for starting this discussion. Having more automation is 
definitely desirable. Esp. in the API / SDK areas where we frequently 
have to add similar comments to PRs. The more checks the better. We 
definitely also need more guidelines (e.g. how to develop a Flink 
connector) but automation is safer then long checklists that might be 
out of date quickly.


+1 to the proposal. I don't have an opinion on the tool though.

Regards,
Timo


On 01.09.21 11:03, Ingo Bürk wrote:

Hello everyone,

I would like to start a discussion on introducing automated tests for more
architectural rather than stilistic topics. For example, here are a few
things that seem worth checking to me (this is Table-API-focused since it
is the subsystem I'm involved in):

(a) All classes in o.a.f.table.api should be annotated with one
of @Internal, @PublicEvolving, or @Public.
(b) Classes whose name ends in *ConnectorOptions should be located in
o.a.f.connector.*.table
(c) Classes implementing DynamicSourceFactory / DynamicSinkFactory should
have no static members of type ConfigOption

There are probably significantly more cases worth checking, and also more
involved ones (these are rather simple examples), like disallowing access
between certain packages etc. There are two questions I would like to ask
to the community:

(1) Do you think such tests are useful in general?
(2) What use cases come to mind for you?

If the idea finds consensus, I would like to use (2) to investigate which
tooling to use. An obvious candidate is Checkstyle, as this is already
used. It also has the advantage of being well integrated in the IDE.
However, it is limited to looking at single files only, and custom checks
are pretty complicated and involved to implement[1]. Another possible tool
is ArchUnit[2], which would be significantly easier to maintain and is more
powerful, but in turn requires tests to be executed. If you have further
suggestions (or thoughts) they would of course also be quite welcome,
though for now I would focus on (1) and (2) and go from there to evaluate.

[1] https://checkstyle.sourceforge.io/writingchecks.html
[2] https://www.archunit.org/


Best
Ingo





[jira] [Created] (FLINK-24116) PRIMARY KEY declaration on insert-only table is not validated globally

2021-09-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-24116:


 Summary: PRIMARY KEY declaration on insert-only table is not 
validated globally
 Key: FLINK-24116
 URL: https://issues.apache.org/jira/browse/FLINK-24116
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Timo Walther


In the regular Kafka connector, we validate that a primary key is not defined 
on an insert-only table source:

https://github.com/apache/flink/pull/13850/commits/c1d4dc89fa629e0f04102e7ab2fa94984cd98218

However, this should be validated for every connector because it is confusing 
if it works for others. We should validate this in the planner instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24103) Create time-based LAST_VALUE / FIRST_VALUE

2021-09-01 Thread Timo Walther (Jira)
Timo Walther created FLINK-24103:


 Summary: Create time-based LAST_VALUE / FIRST_VALUE
 Key: FLINK-24103
 URL: https://issues.apache.org/jira/browse/FLINK-24103
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Timo Walther


LAST_VALUE and FIRST_VALUE don't support merging. As far I can see it, 
FLINK-20110 tries to solve this by using nano second timestamps internally. 
However, an easier and consistent approach could be to allow a time parameter 
in the signature:

{code}
LAST_VALUE(timestamp, value)
FIRST_VALUE(timestamp, value)
{code}

This allows merging based on a timestamp in HOP or SESSION windows.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24087) Kafka connector StartupMode requires a table dependency

2021-08-31 Thread Timo Walther (Jira)
Timo Walther created FLINK-24087:


 Summary: Kafka connector StartupMode requires a table dependency
 Key: FLINK-24087
 URL: https://issues.apache.org/jira/browse/FLINK-24087
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Table SQL / Ecosystem
Reporter: Timo Walther


Table dependencies are marked as {{optional}} for the Kafka connector. We 
should move the {{StartupMode.java#L75}} to {{KafkaConnectorOptionsUtil}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24054) Let SinkUpsertMaterializer emit +U instead of only +I

2021-08-30 Thread Timo Walther (Jira)
Timo Walther created FLINK-24054:


 Summary: Let SinkUpsertMaterializer emit +U instead of only +I
 Key: FLINK-24054
 URL: https://issues.apache.org/jira/browse/FLINK-24054
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Timo Walther
Assignee: Timo Walther


Currently, {{SinkUpsertMaterializer}} is not able to emit +U's but will always 
emit +I's. Thus, resulting changelogs are incorrect strictly speaking and only 
valid when treating +U and +I as similar changes in downstream operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support decimal types with larger precisions

2021-08-30 Thread Timo Walther

Hi Xingcan,

in theory there should be no hard blocker for supporting this. The 
implementation should be flexible enough at most locations. We just 
adopted 38 from the Blink code base which adopted it from Hive.


However, this could be a breaking change for existing pipelines and we 
would need to offer a flag to bring back the old behavior. It would 
definitely lead to a lot of testing work to not cause inconsistencies.


Do you think this is a hard blocker for users?

Regards,
Timo


On 28.08.21 00:21, Xingcan Cui wrote:

Hi all,

Recently, I was trying to load some CDC data from Oracle/Postgres databases
and found that the current precision range [1, 38] for DecimalType may not
meet the requirement for some source types. For instance, in Oracle, if a
column is declared as `NUMBER` without precision and scale, the values in
it could potentially be very large. As DecimalType is backed by Java
BigDecimal, I wonder if we should extend the precision range.

Best,
Xingcan





[jira] [Created] (FLINK-24033) Propagate unique keys for fromChangelogStream

2021-08-27 Thread Timo Walther (Jira)
Timo Walther created FLINK-24033:


 Summary: Propagate unique keys for fromChangelogStream
 Key: FLINK-24033
 URL: https://issues.apache.org/jira/browse/FLINK-24033
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


Similar to FLINK-23915, we are not propagating unique keys for 
{{fromChangelogStream}} because it is not written into statistics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23920) SchemaTranslator looses primary key if schema is inferred

2021-08-23 Thread Timo Walther (Jira)
Timo Walther created FLINK-23920:


 Summary: SchemaTranslator looses primary key if schema is inferred
 Key: FLINK-23920
 URL: https://issues.apache.org/jira/browse/FLINK-23920
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Schema derivation with primary key does not work correctly as the primary key 
is not propagated:

{code}
tableEnv.toChangelogStream(...,
Schema.newBuilder().primaryKey("key").build(),
ChangelogMode.upsert())
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23915) Propagate unique keys for temporary tables

2021-08-23 Thread Timo Walther (Jira)
Timo Walther created FLINK-23915:


 Summary: Propagate unique keys for temporary tables
 Key: FLINK-23915
 URL: https://issues.apache.org/jira/browse/FLINK-23915
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


{{DatabaseCalciteSchema#100}} only propagates unique keys if the table is not 
temporary. This makes primary keys unusable in many cases.

We should fix FLINK-15123 on the way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Projection pushdown for metadata columns

2021-08-23 Thread Timo Walther

Hi everyone,

this sounds definitely like a bug to me. Computing metadata might be 
very expensive and a connector might expose a long list of metadata 
keys. It was therefore intended to project the metadata if possible. I'm 
pretty sure that this worked before (at least when implementing 
SupportsProjectionPushDown). Maybe a bug was introduced when adding the 
Spec support.


Regards,
Timo


On 23.08.21 08:24, Ingo Bürk wrote:

Hi Jingsong,

thanks for your answer. Even if the source implements
SupportsProjectionPushDown, #applyProjections will never be called with
projections for metadata columns. For example, I have the following test:

@Test
def test(): Unit = {
   val tableId = TestValuesTableFactory.registerData(Seq())

   tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
 .schema(Schema.newBuilder()
   .column("f0", DataTypes.INT())
   .columnByMetadata("m1", DataTypes.STRING())
   .columnByMetadata("m2", DataTypes.STRING())
   .build())
 .option("data-id", tableId)
 .option("bounded", "true")
 .option("readable-metadata", "m1:STRING,m2:STRING")
 .build())

   tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
}

Regardless of whether I select only f0 or f0 + m1, #applyReadableMetadata
is always called with m1 + m2, and #applyProjections only ever sees f0. So
as far as I can tell, the source has no way of knowing which metadata
columns are actually needed (under the projection), it always has to
produce metadata for all metadata columns declared in the table's schema.

In PushProjectIntoTableSourceScanRule I also haven't yet found anything
that would suggest that metadata are first projected and only then pushed
to the source. I think the correct behavior should be to call
#applyReadableMetadata only after they have been considered in the
projection.


Best
Ingo


On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li  wrote:


Hi,

I remember the projection only works with SupportsProjectionPushDown.

You can take a look at
`PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.

Will applyReadableMetadata again in the PushProjectIntoTableSourceScanRule.

But there may be bug in
PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:

if (!usedMetadataNames.isEmpty()) {
 sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
newProducedType));
}

If there is no meta column left, we should apply again, We should tell
the source that there is no meta column left after projection.

Best,
Jingsong

On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk  wrote:


Hi everyone,

according to the SupportsReadableMetadata interface, the planner is
supposed to project required metadata columns prior to applying them:


The planner will select required metadata columns (i.e. perform

projection push down) and will call applyReadableMetadata(List, DataType)
with a list of metadata keys.

However, from my experiments it seems that this is not true: regardless

of

what columns I select from a table, #applyReadableMetadata always seems

to

be called with all metadata declared in the schema of the table. Metadata
columns are also excluded from

SupportsProjectionPushDown#applyProjection,

so the source cannot perform the projection either.

This is in Flink 1.13.2. Am I misreading the docs here or is this not
working as intended?


Best
Ingo




--
Best, Jingsong Lee







[jira] [Created] (FLINK-23895) Upsert materializer is not inserted for all sink providers

2021-08-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-23895:


 Summary: Upsert materializer is not inserted for all sink providers
 Key: FLINK-23895
 URL: https://issues.apache.org/jira/browse/FLINK-23895
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


The new {{SinkUpsertMaterializer}} is not inserted for 
{{TransformationSinkProvider}} or {{DataStreamSinkProvider}} which means that 
neither {{toChangelogStream}} not the current {{KafkaDynamicSink}} work 
correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23834) Test StreamTableEnvironment batch mode manually

2021-08-17 Thread Timo Walther (Jira)
Timo Walther created FLINK-23834:


 Summary: Test StreamTableEnvironment batch mode manually
 Key: FLINK-23834
 URL: https://issues.apache.org/jira/browse/FLINK-23834
 Project: Flink
  Issue Type: Improvement
Reporter: Timo Walther


Test a program that mixes DataStream API and Table API batch mode. Including 
some connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23832) Add documentation for batch mode in StreamTableEnvironment

2021-08-17 Thread Timo Walther (Jira)
Timo Walther created FLINK-23832:


 Summary: Add documentation for batch mode in StreamTableEnvironment
 Key: FLINK-23832
 URL: https://issues.apache.org/jira/browse/FLINK-23832
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


The DataStream API Integration page needs an update.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23768) Test StreamTableEnvironment batch mode in Python

2021-08-13 Thread Timo Walther (Jira)
Timo Walther created FLINK-23768:


 Summary: Test StreamTableEnvironment batch mode in Python
 Key: FLINK-23768
 URL: https://issues.apache.org/jira/browse/FLINK-23768
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Table SQL / API
Reporter: Timo Walther


FLINK-20897 enabled batch mode for {{StreamTableEnvironment}}. We should make 
sure that the Python API works as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23707) Use consistent managed memory weights for StreamNode

2021-08-10 Thread Timo Walther (Jira)
Timo Walther created FLINK-23707:


 Summary: Use consistent managed memory weights for StreamNode
 Key: FLINK-23707
 URL: https://issues.apache.org/jira/browse/FLINK-23707
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


Managed memory that is declared on transformations via 
{{Transformation#declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase
 managedMemoryUseCase, int weight)}} should be declared using a weight.

Usually, a weight should be some kind of factor, however, in the table planner 
it is used a kibi byte value. This causes issues on the DataStream API side 
that sets it to {{1}} in 
{{org.apache.flink.streaming.runtime.translators.BatchExecutionUtils#applyBatchExecutionSettings}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-173: Support DAG of algorithms (Flink ML)

2021-08-10 Thread Timo Walther

Hi everyone,

I'm not deeply involved in the discussion but I quickly checked out the 
proposed interfaces because it seems they are using Table API heavily 
and would like to leave some feedback here:


I have the feeling that the proposed interfaces are a bit too simplified.

Methods like `Table[] transform(Table... inputs)` are very difficult to 
handle because they involve a lot of array index magic for implementers 
and users. Also the examples are hard to read because of all the index 
arithmetic going on:



Table output =
  transformer7.transform(
  transformer6.transform(
  transformer5.transform(
  transformer4.transform(
  tranformers3.transform(
transformer2.transform(input2)[0], transformer1.transform(input1)[0]
  )[0])[0])[0])[0])[0])[0];



Table[] compute(Table... inputs) {
Table output1 = new AOp(...).compute(inputs[0])[0];
Table output2 = new AOp(...).compute(inputs[1])[0];
return new BTrainOp(...).compute(output1, output2);
}


Especially for larger pipelines, it will be difficult to distinguish 
between main output, statistics and other side outputs.


Wouldn't it be better to introduce a new concept (maybe even on Table 
API level), to express a modular API operator that takes and returns 
multiple tables. Ideally, those parameters and results would be named 
and/or tagged such that the following operator can easily distinguish 
the different result tables and pick what is needed.


That would make the interfaces a bit more complicated but help 
standardizing the communication between modular operators.


Of course this would need a separate design discussion, but also non-ML 
users in Table API could benefit from.


Regards,
Timo


On 10.08.21 07:28, Dong Lin wrote:

Thank you Mingliang for providing the comments.

Currently option-1 proposes Graph/GraphModel/GraphBuilder to build an
Estimator from a graph of Estimator/Transformer, where Estimator could
generate the model (as a Transformer) directly. On the other hand, option-2
proposes AlgoOperator that can be linked into a graph of AlgoOperator.

It seems that option-1 is closer to what TF does than option-2. Could you
double check whether you mean option-1 or option-2?




On Tue, Aug 10, 2021 at 11:29 AM 青雉(祁明良)  wrote:


Vote for option 2.
It is similar to what we are doing with Tensorflow.
1. Define the graph in training phase
2. Export model with different input/output spec for online inference

Thanks,
Mingliang

On Aug 10, 2021, at 9:39 AM, Becket Qin > wrote:

estimatorInputs



本?件及其附件含有小??公司的保密信息,?限于?送?以上收件人或群?。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、?制、或散?)本?件中的信息。如果??收了本?件,??立即??或?件通知?件人并?除本?件!
This communication may contain privileged or other confidential
information of Red. If you have received it in error, please advise the
sender by reply e-mail and immediately delete the message and any
attachments without copying or disclosing the contents. Thank you.





Re: Incompatible RAW types in Table API

2021-08-09 Thread Timo Walther

Sorry, I meant "will be deprecated in Flink 1.14"

On 09.08.21 19:32, Timo Walther wrote:

Hi Dominik,

`toAppendStream` is soft deprecated in Flink 1.13 and will be deprecated 
in Flink 1.13. It uses the old type system and might not match perfectly 
with the other reworked type system in new functions and sources.


For SQL, a lot of Avro classes need to be treated as RAW types. But we 
might address this issue soon and further improve Avro support.


I would suggest to continue this discussion in a JIRA issue. Can you 
also share the code for `NewEvent` and te Avro schema or generated Avro 
class for `Event` for to have a fully reproducible example?


What can help is to explicitly define the types:

E.g. you can also use `DataTypes.of(TypeInformation)` both in 
`ScalarFunction.getTypeInference` and 
`StreamTableEnvironment.toDataStream()`.


I hope this helps.

Timo

On 09.08.21 16:27, Dominik Wosiński wrote:

It should be `id` instead of `licence` in the error, I've copy-pasted it
incorrectly :<

I've also tried additional thing, i.e. creating the ScalarFunction that
does mapping of one avro generated enum to additional avro generated 
enum:


@FunctionHint(
   input = Array(
 new DataTypeHint(value = "RAW", bridgedTo = classOf[OneEnum])
   ),
   output = new DataTypeHint(value = "RAW", bridgedTo = 
classOf[OtherEnum])

)
class EnumMappingFunction extends ScalarFunction {

   def eval(event: OneEnum): OtherEnum = {OtherEnum.DEFAULT_VALUE}
}

This results in the following error:



*Invalid argument type at position 0. Data type RAW('org.test.OneEnum',
'...') expected but RAW('org.test.OneEnum', '...') passed.*


pon., 9 sie 2021 o 15:13 Dominik Wosiński  napisał(a):


Hey all,

I think I've hit some weird issue in Flink TypeInformation generation. I
have the following code:

val stream: DataStream[Event] = ...
tableEnv.createTemporaryView("TableName",stream)
val table = tableEnv
.sqlQuery("SELECT id, timestamp, eventType from TableName")
tableEnvironment.toAppendStream[NewEvent](table)

In this particual example *Event* is an avro generated class and 
*NewEvent

*is just POJO. This is just a toy example so please ignore the fact that
this operation doesn't make much sense.

When I try to run the code I am getting the following error:





*org.apache.flink.table.api.ValidationException: Column types of query
result and sink for unregistered table do not match.Cause: Incompatible
types for sink column 'licence' at position 0.Query schema: [id:
RAW('org.apache.avro.util.Utf8', '...'), timestamp: BIGINT NOT NULL, 
kind:

RAW('org.test.EventType', '...')]*

*Sink schema:  id: RAW('org.apache.avro.util.Utf8', '?'), timestamp:
BIGINT, kind: RAW('org.test.EventType', '?')]*

So, it seems that the type is recognized correctly but for some reason
there is still mismatch according to Flink, maybe because of 
different type

serializer used ?

Thanks in advance for any help,
Best Regards,
Dom.












Re: Incompatible RAW types in Table API

2021-08-09 Thread Timo Walther

Hi Dominik,

`toAppendStream` is soft deprecated in Flink 1.13 and will be deprecated 
in Flink 1.13. It uses the old type system and might not match perfectly 
with the other reworked type system in new functions and sources.


For SQL, a lot of Avro classes need to be treated as RAW types. But we 
might address this issue soon and further improve Avro support.


I would suggest to continue this discussion in a JIRA issue. Can you 
also share the code for `NewEvent` and te Avro schema or generated Avro 
class for `Event` for to have a fully reproducible example?


What can help is to explicitly define the types:

E.g. you can also use `DataTypes.of(TypeInformation)` both in 
`ScalarFunction.getTypeInference` and 
`StreamTableEnvironment.toDataStream()`.


I hope this helps.

Timo

On 09.08.21 16:27, Dominik Wosiński wrote:

It should be `id` instead of `licence` in the error, I've copy-pasted it
incorrectly :<

I've also tried additional thing, i.e. creating the ScalarFunction that
does mapping of one avro generated enum to additional avro generated enum:

@FunctionHint(
   input = Array(
 new DataTypeHint(value = "RAW", bridgedTo = classOf[OneEnum])
   ),
   output = new DataTypeHint(value = "RAW", bridgedTo = classOf[OtherEnum])
)
class EnumMappingFunction extends ScalarFunction {

   def eval(event: OneEnum): OtherEnum = {OtherEnum.DEFAULT_VALUE}
}

This results in the following error:



*Invalid argument type at position 0. Data type RAW('org.test.OneEnum',
'...') expected but RAW('org.test.OneEnum', '...') passed.*


pon., 9 sie 2021 o 15:13 Dominik Wosiński  napisał(a):


Hey all,

I think I've hit some weird issue in Flink TypeInformation generation. I
have the following code:

val stream: DataStream[Event] = ...
tableEnv.createTemporaryView("TableName",stream)
val table = tableEnv
.sqlQuery("SELECT id, timestamp, eventType from TableName")
tableEnvironment.toAppendStream[NewEvent](table)

In this particual example *Event* is an avro generated class and *NewEvent
*is just POJO. This is just a toy example so please ignore the fact that
this operation doesn't make much sense.

When I try to run the code I am getting the following error:





*org.apache.flink.table.api.ValidationException: Column types of query
result and sink for unregistered table do not match.Cause: Incompatible
types for sink column 'licence' at position 0.Query schema: [id:
RAW('org.apache.avro.util.Utf8', '...'), timestamp: BIGINT NOT NULL, kind:
RAW('org.test.EventType', '...')]*

*Sink schema:  id: RAW('org.apache.avro.util.Utf8', '?'), timestamp:
BIGINT, kind: RAW('org.test.EventType', '?')]*

So, it seems that the type is recognized correctly but for some reason
there is still mismatch according to Flink, maybe because of different type
serializer used ?

Thanks in advance for any help,
Best Regards,
Dom.










[jira] [Created] (FLINK-23663) Reduce state size in ChangelogNormalize through filter push down

2021-08-06 Thread Timo Walther (Jira)
Timo Walther created FLINK-23663:


 Summary: Reduce state size in ChangelogNormalize through filter 
push down
 Key: FLINK-23663
 URL: https://issues.apache.org/jira/browse/FLINK-23663
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Timo Walther


{{ChangelogNormalize}} is an expensive stateful operation as it stores data for 
each key. 

Filters are generally not pushed through a ChangelogNormalize node which means 
that users have no possibility to at least limit the key space. Pushing filters 
like {{a < 10}} into a source like {{upsert-kafka}} that is emitting {{+I[key1, 
a=9]}} and {{-D[key1, a=10]}}, is problematic as the deletion will be filtered 
and leads to wrong results. But limiting the filter push down to key space 
should be safe.

Furthermore, it seems the current implementation is also wrong as it pushes 
filters through {{ChangelogNormalize}} but only if the source implements filter 
push down.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23646) Use pipeline name consistently across DataStream API and Table API

2021-08-05 Thread Timo Walther (Jira)
Timo Walther created FLINK-23646:


 Summary: Use pipeline name consistently across DataStream API and 
Table API
 Key: FLINK-23646
 URL: https://issues.apache.org/jira/browse/FLINK-23646
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Currently, the pipeline name configured in {{StreamExecutionEnvironment}} is 
not always considered in a table environment. E.g. when using {{executeSql}}. 
In general, the job name code can be simplified by relying on the 
{{StreamGraphGenerator}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23498) Use a layered configuration in Executor and Planner

2021-07-26 Thread Timo Walther (Jira)
Timo Walther created FLINK-23498:


 Summary: Use a layered configuration in Executor and Planner
 Key: FLINK-23498
 URL: https://issues.apache.org/jira/browse/FLINK-23498
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


The configuration story in Flink is not very consistent at the moment. Ideally, 
we should have a layered approach where pipeline executor, DataStream API, and 
Table API store their configuration and add it to a global configuration on 
request. However, this is a big change that we would like to avoid at this 
point.

Instead, we partially follow this approach by adding:
- {{Executor.configure}} to propagate changes to the 
{{StreamExecutionEnvironment}} layer
- {{Executor.getConfiguration}} to access the config of lower layers
- {{Planner.getConfiguration}} to access a global configuration during planning

This is required to access properties stored in 
{{StreamExecutionEnvironment.configuration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23482) Simplify BlinkExecutorFactory stack

2021-07-23 Thread Timo Walther (Jira)
Timo Walther created FLINK-23482:


 Summary: Simplify BlinkExecutorFactory stack
 Key: FLINK-23482
 URL: https://issues.apache.org/jira/browse/FLINK-23482
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


The {{BlinkExecutorFactory}} stack uses the old table factory stack and is not 
needed anymore as the old planner has been removed. We should simplify the 
logic there.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23470) Use blocking shuffles but pipeline within a slot

2021-07-22 Thread Timo Walther (Jira)
Timo Walther created FLINK-23470:


 Summary: Use blocking shuffles but pipeline within a slot
 Key: FLINK-23470
 URL: https://issues.apache.org/jira/browse/FLINK-23470
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Reporter: Timo Walther


As discussed in FLINK-23402, we would like to introduce a good default shuffle 
mode for batch runtime mode that is a trade-off between all pipelined and all 
blocking shuffles.

>From the discussion in FLINK-23402:

For the shuffle modes, I think those three settings are actually sufficient.:

1. pipeline all, for batch execution that wants pipelined shuffles. (Still 
batch recovery, no checkpoints, batch operators)
2. batch all, just in case you want to.
3. batch shuffles, pipeline within a slot. (DEFAULT)

This should be the default, and it means we batch whenever a slot has a 
dependency on another slot.

A dependency between slots is:

- any all-to-all connection (keyBy, broadcast, rebalance, random)
- any pointwise connection (rescale)
- any forward between different slot sharing groups
Effectively only FORWARD connections within the same slot sharing group has no 
dependency on another slot.

That mode makes a lot of sense as the default, because it guarantees that we 
can always run the program as long as we have at least one slot. No resource 
starvation ever. But it retains pipelining where we don't chain operators due 
to missing chaining logic (but we still slot-share them).

Compared to this (3) mode, FORWARD_EDGES_PIPELINED and 
POINTWISE_EDGES_PIPELINED are not well-defined.

POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain 
amount of resources, related to the rescale factor. Otherwise the job may fail 
with resource starvation. Hard to understand and debug for users; not a great 
option in my opinion.

FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation 
when the forward connection connects different slot sharing groups.
That's why I would drop those (they make it confusing for users) not reuse the 
GlobalDataExchangeMode, and rather introduce the option (3) above, which mostly 
batches the exchanges, except when then they are guaranteed to be in the same 
slot.

As a side note: The difference between (3) and (2) should be already relatively 
small in SQL jobs and become smaller over time, as more and more can be chained 
together.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23450) Properties map is not set in DebeziumAvroFormatFactory

2021-07-21 Thread Timo Walther (Jira)
Timo Walther created FLINK-23450:


 Summary: Properties map is not set in DebeziumAvroFormatFactory
 Key: FLINK-23450
 URL: https://issues.apache.org/jira/browse/FLINK-23450
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-21229 did not set the properties map correctly in 
DebeziumAvroFormatFactory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23426) Support changelog processing in batch mode

2021-07-19 Thread Timo Walther (Jira)
Timo Walther created FLINK-23426:


 Summary: Support changelog processing in batch mode
 Key: FLINK-23426
 URL: https://issues.apache.org/jira/browse/FLINK-23426
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


The DataStream API can execute arbitrary DataStream programs when running in 
batch mode. However, this is not the case for the Table API batch mode. E.g. a 
source with non-insert only changes is not supported and updates/deletes cannot 
be emitted.

In theory, we could make this work by running the "stream mode" of the planner 
(CDC transformations) on top of the "batch mode" of DataStream API (specialized 
state backend, sorted inputs). It is up for discussion if and how we expose 
such functionality.

If we don't allow enabling incremental updates, we can also add a special batch 
operator that materializes the incoming changes for a batch pipeline. However, 
it would require "complete" CDC logs (i.e. no missing UPDATE_AFTER).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Introduction email

2021-07-19 Thread Timo Walther

Hi Srini,

welcome aboard! Great to see more adoption in the SQL space. Looking 
forward to collaboration.


Regards,
Timo

On 19.07.21 10:58, Till Rohrmann wrote:

Hi Srini,

Welcome to the Flink community :-) Great to hear what you are planning to
do with Flink at LinkedIn. I think sharing this is very motivational for
the community and also gives context for what you are focusing on. Looking
forward to working with you and improving Flink.

Cheers,
Till

On Fri, Jul 16, 2021 at 8:36 PM Srinivasulu Punuru 
wrote:


Hi Flink Devs,

I am Srini, I work for stream processing team at LinkedIn. LinkedIn is
taking a big bet on Apache Flink and migrating all the existing streaming
SQL apps to Flink. You might have seen mails from some of our team members
past few months. Thanks a lot for your support!

I just wanted to Say Hi to everyone before I take up some of the starter
Jiras and start contributing.

Thanks Again! Looking forward to collaboration :)

Here are some of the quick notes about our Flink scenarios.

1. We will be using Flink SQL just for stream processing applications.
2. Most of our current SQL apps are stateless, But stateful SQL
capabilities is one of the reasons we are migrating to Flink. SQL state
management is an area of interest.
3. We also have customers asking for batch and streaming convergence, So
SQL based batch <-> streaming convergence or engine portability of SQL
apps
is an area of interest.
4. We are initially on prem. But LinkedIn as a whole is betting on
Cloud. So taking advantage some of the cloud capabilities like Storage
compute disaggregation, Elastic compute (for auto-scaling) for Flink
would
be interesting.
5. We also provide a managed streaming SQL service i.e. We manage the
SQL jobs for our developers. So reliability, operability and quick
recovery
is critical as well :).

Thanks,
Srini.







Re: [VOTE] Release flink-shaded 14.0, release candidate 1

2021-07-19 Thread Timo Walther

+1 (binding)

I went through all commits one more time and could not spot anything 
that would block a release.


Thanks Chesnay!

Timo

On 15.07.21 09:02, Chesnay Schepler wrote:

Hi everyone,
Please review and vote on the release candidate #1 for the version 14.0, 
as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org 
[2], which are signed with the key with fingerprint C2EED7B111D464BA [3],

* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.


Thanks,
Chesnay

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350408 


[2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-14.0-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1435
[5] https://github.com/apache/flink-shaded/commits/release-14.0-rc1
[6] https://github.com/apache/flink-web/pull/458





[jira] [Created] (FLINK-23424) Disable key sorting for Transformations created within table planner

2021-07-19 Thread Timo Walther (Jira)
Timo Walther created FLINK-23424:


 Summary: Disable key sorting for Transformations created within 
table planner
 Key: FLINK-23424
 URL: https://issues.apache.org/jira/browse/FLINK-23424
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


In batch mode, the table planner is fully in control of the generated 
Transformations. So we can disable the key sorting in this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23402) Expose a consistent GlobalDataExchangeMode

2021-07-15 Thread Timo Walther (Jira)
Timo Walther created FLINK-23402:


 Summary: Expose a consistent GlobalDataExchangeMode
 Key: FLINK-23402
 URL: https://issues.apache.org/jira/browse/FLINK-23402
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Timo Walther


The Table API makes the {{GlobalDataExchangeMode}} configurable via 
{{table.exec.shuffle-mode}}.

In Table API batch mode the StreamGraph is configured with 
{{ALL_EDGES_BLOCKING}} and in DataStream API batch mode 
{{FORWARD_EDGES_PIPELINED}}.

I would vote for unifying the exchange mode of both APIs so that complex SQL 
pipelines behave identical in {{StreamTableEnvironment}} and 
{{TableEnvironment}}. Also the feedback a got so far would make 
{{ALL_EDGES_BLOCKING}} a safer option to run pipelines successfully with 
limited resources.

[~lzljs3620320]
{noformat}
The previous history was like this:
- The default value is pipeline, and we find that many times due to 
insufficient resources, the deployment will hang. And the typical use of batch 
jobs is small resources running large parallelisms, because in batch jobs, the 
granularity of failover is related to the amount of data processed by a single 
task. The smaller the amount of data, the faster the fault tolerance. So most 
of the scenarios are run with small resources and large parallelisms, little by 
little slowly running.

- Later, we switched the default value to blocking. We found that the better 
blocking shuffle implementation would not slow down the running speed much. We 
tested tpc-ds and it took almost the same time.
{noformat}

[~dwysakowicz]
{noformat}
I don't see a problem with changing the default value for DataStream batch mode 
if you think ALL_EDGES_BLOCKING is the better default option.
{noformat}

In any case, we should make this configurable for DataStream API users and make 
the specific Table API option obsolete.

It would include the following steps:

- Move {{GlobalDataExchangeMode}} from {{o.a.f.streaming.api.graph}} to 
{{o.a.f.api.common}} (with reworked JavaDocs) as {{ExchangeMode}} (to have it 
shorter) next to {{RuntimeMode}}
- Add {{StreamExecutionEnvironment.setExchangeMode()}} next to 
{{setRuntimeMode}}
- Add option {{execution.exchange-mode}}
- Add checks for invalid combinations to StreamGraphGenerator
- Deprecate ExecutionMode ([avoid 
confusion|https://stackoverflow.com/questions/68335472/what-is-difference-in-runtimeexecutionmode-and-executionmode])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23384) Use upper case name for all BuiltInFunctionDefinitions

2021-07-14 Thread Timo Walther (Jira)
Timo Walther created FLINK-23384:


 Summary: Use upper case name for all BuiltInFunctionDefinitions
 Key: FLINK-23384
 URL: https://issues.apache.org/jira/browse/FLINK-23384
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


Currently, some functions in BuiltInFunctionDefinitions use camel case names. 
This is mostly due to historical reasons when using the string-based Java 
expression API. Once {{ExpressionParser}} is dropped we can also normalize the 
names.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23373) Support object reuse disabled in OperatorChain

2021-07-13 Thread Timo Walther (Jira)
Timo Walther created FLINK-23373:


 Summary: Support object reuse disabled in OperatorChain
 Key: FLINK-23373
 URL: https://issues.apache.org/jira/browse/FLINK-23373
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Timo Walther


Currently, object reuse must be enabled in order to use chained sources.

Tests such as `HiveDialectQueryITCase` will fail with an exception:
{code}
2021-07-12T14:47:55.8233741Z Jul 12 14:47:55 [ERROR] 
testQueries(org.apache.flink.connectors.hive.HiveDialectQueryITCase)  Time 
elapsed: 12.283 s  <<< ERROR!
2021-07-12T14:47:55.8234433Z Jul 12 14:47:55 java.lang.RuntimeException: Failed 
to fetch next result
2021-07-12T14:47:55.8235133Z Jul 12 14:47:55at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
2021-07-12T14:47:55.8235958Z Jul 12 14:47:55at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
2021-07-12T14:47:55.8236774Z Jul 12 14:47:55at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)

2021-07-12T14:47:55.8313594Z Jul 12 14:47:55 Caused by: 
java.lang.UnsupportedOperationException: Currently chained sources are 
supported only with objectReuse enabled
2021-07-12T14:47:55.8314356Z Jul 12 14:47:55at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSourceOutput(OperatorChain.java:355)
2021-07-12T14:47:55.8315109Z Jul 12 14:47:55at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSources(OperatorChain.java:322)
2021-07-12T14:47:55.8315820Z Jul 12 14:47:55at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:220)
2021-07-12T14:47:55.8316506Z Jul 12 14:47:55at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:558)
2021-07-12T14:47:55.8317209Z Jul 12 14:47:55at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:661)
2021-07-12T14:47:55.8317948Z Jul 12 14:47:55at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:547)
2021-07-12T14:47:55.8318626Z Jul 12 14:47:55at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
2021-07-12T14:47:55.8319205Z Jul 12 14:47:55at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
2021-07-12T14:47:55.8319725Z Jul 12 14:47:55at 
java.lang.Thread.run(Thread.java:748)
2021-07-12T14:47:55.8320122Z Jul 12 1
{code}

The fix should looks as follows:

This particular exception should be rather straightforward to fix. The reason 
it's not implemented is because the chaining sources feature was implemented in 
the minimal scope required by blink planner and is missing around ~50-100 lines 
of production code to work with the object reuse disabled.
In the {{OperatorChain#createChainedSourceOutput}} we need to something similar 
as is done in {{OperatorChain#wrapOperatorIntoOutput}} , so something like:
{code}
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
return closer.register(new ChainingOutput(input, metricGroup, 
outputTag));
} else {
TypeSerializer inSerializer =
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
return closer.register(new CopyingChainingOutput(input, 
inSerializer, metricGroup, outputTag));
}
{code}
the missing part to do that is to make {{CopyingChainingOutput}} work with an 
Input instead of an Operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23372) Disable AllVerticesInSameSlotSharingGroupByDefault in DataStream batch mode

2021-07-13 Thread Timo Walther (Jira)
Timo Walther created FLINK-23372:


 Summary: Disable AllVerticesInSameSlotSharingGroupByDefault in 
DataStream batch mode
 Key: FLINK-23372
 URL: https://issues.apache.org/jira/browse/FLINK-23372
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Timo Walther
Assignee: Timo Walther


In order to unify the behavior of DataStream API and Table API batch mode, we 
should disable AllVerticesInSameSlotSharingGroupByDefault also in DataStream 
API.

FLINK-20001 reverted setting this flag but without concrete arguments and the 
following comment: {{reconsider actually setting this flag in the future}}

After a offline chat with [~zhuzh], we should introduce this again for 
consistency:

{code}
The goal to assign different regions to different slot sharing groups by 
default is to reduce waste of resources. In batch jobs, there can be one region 
which has data dependency on another region. And the resource computation for 
slots and managed memory will be affected:
  1 . If these regions are in the same slot sharing group, the group will 
require a large slot which can host tasks from both the regions.
  2. In managed memory fraction computing, tasks from both regions will be 
considered to compete for managed memory, so each task will be assigned with a 
smaller managed memory fraction (FLIP-53).

However, those regions will not run at the same time and results in a waste of 
resources. 

For streaming jobs, all tasks will run at the same time. So assigning them to 
the same slot sharing group will not result resource waste.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23371) Disable AutoWatermarkInterval for bounded legacy sources

2021-07-13 Thread Timo Walther (Jira)
Timo Walther created FLINK-23371:


 Summary: Disable AutoWatermarkInterval for bounded legacy sources
 Key: FLINK-23371
 URL: https://issues.apache.org/jira/browse/FLINK-23371
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Timo Walther
Assignee: Timo Walther


{{LegacySourceTransformationTranslator}} has currently no special path for 
bounded sources. However, the table planner might add bounded legacy sources 
while AutoWatermarkInterval is still enabled. We should have the same logic as 
in {{SourceTransformationTranslator}} for disabling intermediate watermarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23370) Propagate Boundedness of SourceFunctionProvider to Transformation

2021-07-13 Thread Timo Walther (Jira)
Timo Walther created FLINK-23370:


 Summary: Propagate Boundedness of SourceFunctionProvider to 
Transformation
 Key: FLINK-23370
 URL: https://issues.apache.org/jira/browse/FLINK-23370
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


SourceFunctionProvider does currently not propagate the boundedness for 
{{StreamGraphGenerator}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23330) Deprecate toAppendStream and toRetractStream

2021-07-09 Thread Timo Walther (Jira)
Timo Walther created FLINK-23330:


 Summary: Deprecate toAppendStream and toRetractStream
 Key: FLINK-23330
 URL: https://issues.apache.org/jira/browse/FLINK-23330
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


The new {{toDataStream}} and {{toChangelogStream}} should be stable by now. 
They will not be marked experimental in 1.14. So we can deprecate the old 
methods and remove them in a couple of releases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23320) Support semi/anti temporal joins

2021-07-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-23320:


 Summary: Support semi/anti temporal joins
 Key: FLINK-23320
 URL: https://issues.apache.org/jira/browse/FLINK-23320
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Timo Walther


Similar to FLINK-23305, we should also allow semi/anti joins for temporal joins 
such as:

{code}
SELECT T.*
FROM MyTable AS T
WHERE EXISTS (
SELECT * FROM VersionedTable FOR SYSTEM_TIME AS OF T.rowtime AS D
WHERE T.a = D.id)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23319) Support semi/anti lookup joins

2021-07-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-23319:


 Summary: Support semi/anti lookup joins
 Key: FLINK-23319
 URL: https://issues.apache.org/jira/browse/FLINK-23319
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Timo Walther


Similar to FLINK-23305, we should also allow semi/anti joins for lookup joins 
such as:

{code}
SELECT T.*
FROM MyTable AS T
WHERE EXISTS (
SELECT * FROM LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
WHERE T.a = D.id)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Feedback Collection Jira Bot

2021-07-08 Thread Timo Walther
weeks without any visible progress on the

implementation.


On Fri, May 21, 2021 at 2:06 PM Konstantin

Knauf

<

kna...@apache.org

wrote:


Hi Timo,

Thanks for joining the discussion. All rules

except

the

unassigned

rule

do

not apply to Sub-Tasks actually (like

deprioritization,

closing).

Additionally, activity on a Sub-Taks counts as

activity

for

the

parent.

So,

the parent ticket would not be touched by the

bot

as

long

as

there

is

a

single Sub-Task that has a discussion or an

update.

If

you

experience

something different, this is a bug.

Is there a reason why it is important to

assign

all

Sub-Tasks

to

the

same

person immediately? I am not sure if this kind

"reserving

tickets"

is

a

good idea in general to be honest.

Cheers,

Konstantin





On Fri, May 21, 2021 at 12:00 PM Timo Walther

<

twal...@apache.org

wrote:

Hi Konstantin,

thanks for starting this discussion. I was

also

about

to

provide

some

feedback because I have the feeling that the

bot

is

too

aggressive

at

the moment.

Even a 14 days interval is a short period of

time

for

bigger

efforts

that might include several subtasks.

Currently,

if

we

split

an

issue

into subtasks usually most subtasks are

assigned

to

the

same

person.

But

the bot requires us to update all subtasks

again

after

7

days.

Could we

disable the bot for subtasks or extend the

period

to

30

days?


The core problem in the past was that we had

issues

laying

around

untouched for years. Luckily, this is solved

with

the

bot

now.

But

going

from years to 7 days spams the mail box

quite a

bit.


Regards,
Timo


On 21.05.21 09:22, Konstantin Knauf wrote:

Hi Robert,

Could you elaborate on your comment on test

instabilities?

Would

test

instabilities always get a fixVersion then?

Background: Test instabilities are supposed

to

be

Critical.

Critical

tickets are deprioritized if they are

unassigned

and

have

not

received

an

update for 14 days.

Cheers,

Konstantin



On Thu, May 20, 2021 at 9:34 AM Robert

Metzger <

rmetz...@apache.org>

wrote:

+1
This would also cover test instabilities,

which I

personally

believe

should

not be auto-deprioritized until they've

been

analyzed.


On Wed, May 19, 2021 at 1:46 PM Till

Rohrmann <

trohrm...@apache.org

wrote:


I like this idea. +1 for your proposal

Konstantin.


Cheers,
Till

On Wed, May 19, 2021 at 1:30 PM Konstantin

Knauf <

konstan...@ververica.com

wrote:


Hi everyone,

Till and I recently discussed whether we

should

disable

the

"stale-blocker", "stale-critical",

"stale-major"

and

"stale-minor"

rules

for tickets that have a fixVersion set.

This

would

allow

people to

plan

the

upcoming release without tickets being

deprioritized

by

the

bot

during

the

release cycle.

   From my point of view, this is a good

idea

as

long

as

we

can

agree

to

use

the "fixVersion" a bit more

conservatively.

What

do I

mean

by

that?

If

you

would categorize tickets planned for an

upcoming

release

into:

* Must Have
* Should Have
* Nice-To-Have

only "Must Have" and "Should Have"

tickets

should

get a

fixVersion.

From

my

observation, we currently often set the

fixVersion

if

we

just

wished a

feature was included in an upcoming

release.

Similarly, I

often

see

bulk

changes of fixVersion that "roll over"

many

tickets

to

the

next

release

if

they have not made into the previous

release

although

there

is

no

concrete

plan to fix them or they have even become

obsolete

by

then.

Excluding

those

from the bot would be counterproductive.

What do you think?

Cheers,

Konstantin


On Fri, Apr 23, 2021 at 2:25 PM

Konstantin

Knauf

<

kna...@apache.org

wrote:


Hi everyone,

After some offline conversations, I

think,

it

makes

sense

to

already

open

this thread now in order to collect

feedback

and

suggestions

around

the

Jira Bot.

The following two changes I will do

right

away:


* increase "stale-assigned.stale-days"

to

14

days

(Marta,

Stephan,

Nico

have provided feedback that this is too

aggressive).


* exclude Sub-Tasks from all rules

except

the

"stale-assigned"

rule

(I

think, this was just an oversight in the

original

discussion.)

Keep it coming.

Cheers,

Konstantin

--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk



--

Konstantin Knauf | Head of Product

+49 160 91394525


Follow us @VervericaData Ververica <

https://www.ververica.com/


--

Join Flink Forward <

https://flink-forward.org/


-

The

Apache

Flink

Conference

Stream Processing | Event Driven | Real

Time


--

Ververica GmbH | Invalidenstrasse 115,

10115

Berlin,

Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg:

HRB

158244

B

Managing Directors: Yip Park Tung Jason,

Jinwei

(Kevin)

Zha

[jira] [Created] (FLINK-23313) Reintroduce temporal table function documentation

2021-07-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-23313:


 Summary: Reintroduce temporal table function documentation
 Key: FLINK-23313
 URL: https://issues.apache.org/jira/browse/FLINK-23313
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Reporter: Timo Walther


FLIP-132 introduced the new {{FOR SYSTEM_TIME AS OF}} and dropped the main 
documentation for temporal table function. This causes a lot of confusion for 
users.

First, because processing time joins are not supported yet.

Second, because a primary key might not always be present in the current 
pipeline when using Table API.

We have not deprecated `createTemporalTableFunction` and the documentation in 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#join-with-temporal-table
 is not enough.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23306) FlinkRelMdUniqueKeys causes exception when used with new Schema

2021-07-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-23306:


 Summary: FlinkRelMdUniqueKeys causes exception when used with new 
Schema
 Key: FLINK-23306
 URL: https://issues.apache.org/jira/browse/FLINK-23306
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Timo Walther
Assignee: Timo Walther


FlinkRelMdUniqueKeys should not use the deprecated `TableSchema`. It causes 
exceptions when e.g. {{sourceWatermark()}} is used in schema.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23305) Support semi/anti interval joins

2021-07-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-23305:


 Summary: Support semi/anti interval joins
 Key: FLINK-23305
 URL: https://issues.apache.org/jira/browse/FLINK-23305
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Timo Walther


I don't see a reason why we shouldn't support interval joins also for semi/anti 
joins like:

{code}
SELECT *
FROM OT
WHERE EXISTS (
SELECT *
FROM TT
WHERE TT.tx = OT.tx AND
TT.isEnd = TRUE AND
TT.rowtime BETWEEN OT.rowtime AND OT.rowtime + INTERVAL '1' HOUR)
{code}

The resulting plan contains a join operation anyway but without detecting the 
interval.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Incrementally deprecating the DataSet API

2021-07-07 Thread Timo Walther

Hi Etienne,

sorry for the late reply due to my vacation last week.

Regarding: "support of aggregations in batch mode for DataStream API 
[...] is there a plan to solve it before the actual drop of DataSet API"


Just to clarify it again: we will not drop the DataSet API any time 
soon. So users will have enough time to update their pipelines. There 
are a couple of features missing to fully switch to DataStream API in 
batch mode. Thanks for opening an issue, this is helpful for us to 
gradually remove those barriers. They don't need to have a "Blocker" 
priority in JIRA for now.


But aggregations is a good example where we should discuss if it would 
be easier to simply switch to Table API for that. Table API has a lot of 
aggregation optimizations and can work on binary data. Also joins should 
be easier in Table API. DataStream API can be a very low-level API in 
the near future and most use cases (esp. the batch ones) should be 
possible in Table API.


Regarding: "Is it needed to port these Avro enhancements to new 
DataStream connectors (add a new equivalent of 
ParquetColumnarRowInputFormat but for Avro)"


We should definitely not loose functionality. The same functionality 
should be present in the new connectors. The questions is rather whether 
we need to offer a DataStream API connector or if a Table API connector 
would be nicer to use (also nicely integrated with catalogs).


So a user can use a simple CREATE TABLE statement to configure the 
connector; an easier abstraction is almost not possible. With 
`tableEnv.toDataStream(table)` you can then continue in DataStream API 
if there is still a need for it.


Regarding: "there are parquet bugs still open on deprecated parquet 
connector"


Yes, bugs should still be fixed in 1.13.

Regarading: "I've been doing TPCDS benchmarks with Flink lately"

Great to hear that :-)

Did you also see the recent discussion? A TPC-DS benchmark can further 
be improved by providing statistics. Maybe this is helpful to you:


https://lists.apache.org/thread.html/ra383c23f230ab8e7fa16ec64b4f277c267d6358d55cc8a0edc77bb63%40%3Cuser.flink.apache.org%3E

I will prepare a blog post shortly.

Regards,
Timo



On 06.07.21 15:05, Etienne Chauchot wrote:

Hi all,

Any comments ?

cheers,

Etienne

On 25/06/2021 15:09, Etienne Chauchot wrote:

Hi everyone,

@Timo, my comments are inline for steps 2, 4 and 5, please tell me 
what you think.


Best

Etienne


On 23/06/2021 15:27, Chesnay Schepler wrote:
If we want to publicize this plan more shouldn't we have a rough 
timeline for when 2.0 is on the table?


On 6/23/2021 2:44 PM, Stephan Ewen wrote:

Thanks for writing this up, this also reflects my understanding.

I think a blog post would be nice, ideally with an explicit call for
feedback so we learn about user concerns.
A blog post has a lot more reach than an ML thread.

Best,
Stephan


On Wed, Jun 23, 2021 at 12:23 PM Timo Walther  
wrote:



Hi everyone,

I'm sending this email to make sure everyone is on the same page about
slowly deprecating the DataSet API.

There have been a few thoughts mentioned in presentations, offline
discussions, and JIRA issues. However, I have observed that there are
still some concerns or different opinions on what steps are 
necessary to

implement this change.

Let me summarize some of the steps and assumpations and let's have a
discussion about it:

Step 1: Introduce a batch mode for Table API (FLIP-32)
[DONE in 1.9]

Step 2: Introduce a batch mode for DataStream API (FLIP-134)
[DONE in 1.12]



I've been using DataSet API and I tested migrating to DataStream + 
batch mode.


I opened this (1) ticket regarding the support of aggregations in 
batch mode for DataStream API. It seems that join operation (at least) 
does not work in batch mode even though I managed to implement a join 
using low level KeyedCoProcessFunction (thanks Seth, for the pointer !).


=> Should it be considered a blocker ? Is there a plan to solve it 
before the actual drop of DataSet API ? Maybe in step 6 ?


[1] https://issues.apache.org/jira/browse/FLINK-22587




Step 3: Soft deprecate DataSet API (FLIP-131)
[DONE in 1.12]

We updated the documentation recently to make this deprecation even 
more

visible. There is a dedicated `(Legacy)` label right next to the menu
item now.

We won't deprecate concrete classes of the API with a @Deprecated
annotation to avoid extensive warnings in logs until then.

Step 4: Drop the legacy SQL connectors and formats (FLINK-14437)
[DONE in 1.14]

We dropped code for ORC, Parque, and HBase formats that were only used
by DataSet API users. The removed classes had no documentation and 
were

not annotated with one of our API stability annotations.

The old functionality should be available through the new sources and
sinks for Table API and DataStream API. If not, we should bring them
into a shape that they can be a full replacement.

DataSet users are encouraged to eithe

[ANNOUNCE] The term "blink" has been removed from the code base

2021-07-06 Thread Timo Walther

Hi everyone,

as discussed previously [1] and tracked in FLINK-14437, we executed the 
last step of FLIP-32 and removed all occurences of the term "blink" in 
the code base.


This includes renaming the following Maven modules:

flink-table-planner-blink -> flink-table-planner
flink-table-runtime-blink -> flink-table-runtime
flink-table-uber-blink-> flink-table-uber

This will cause a couple of merge conflicts. Be careful when merging PRs 
that change the mentioned modules to not break the master.


In order to reduce user confusion, we should not use "Blink 
engine/planner" in docs/presentations/talks and refer to it as "Flink 
SQL", "Flink Table API", or "Flink SQL planner".


Sorry, for any inconvinience that this refactoring might have caused.

Regards,
Timo

[1] 
https://lists.apache.org/thread.html/r0851e101e37fbab273775b6a252172c7a9f7c7927107c160de779831%40%3Cdev.flink.apache.org%3E


Re: 回复:[DISCUSS] [FLINK-23122] Provide the Dynamic register converter

2021-06-24 Thread Timo Walther

Hi Jack,

thanks for sharing your proposal with us. I totally understand the 
issues that you are trying to solve. Having a more flexible type support 
in the connectors is definitely a problem that we would like to address 
in the mid term. It is already considered in on our internal roadmap 
planning.


I haven't taken a deeper look at your current proposal but will do so 
soon. Until then, let me give you some general feedback.


I see a couple of orthogonal issues that we need to solve:

1) The TIMESTAMP_WITH_TIME_ZONE problem: this is one of the easier 
issues that we simply need to fix on the runtime side. We are planning 
to support this type because it is one of the core data structures that 
you need in basically every pipeline.


2) Unsupported types of other systems: As Jark said, we offer support 
for RAW types and also user-defined structured types. Since most of the 
pre-requisite work has been done for user-defined types (e.g. a central 
type registry). I could imagine that we are able to extend Flink's type 
system soon. My idea would be to provide modules via Flink's module 
system to load Postgres or MySQL specific types that could then be used 
at all regular locations such as DDL or functions.


3) Add connector specific type information in DDL: We should allow to 
enrich the automatic schema convertion step when translating DDL into 
other system's types. This is were you proposal might make sense.



Ragrds,
Timo


On 24.06.21 14:19, 云华 wrote:


@Jark Wuthanks reply.  However Several case I want to cover:

1, Unknown types CITEXT:
Flink SQL cannot exexute "CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv 
CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, b BYTEA, bnn BYTEA NOT NULL, ct 
CITEXT, PRIMARY KEY(pk));".
this is because 
org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType cannot 
support CITEXT.

2, TIMESTAMP_WITH_TIME_ZONE unsuppoted : 
org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal 
cannot support TIMESTAMP_WITH_TIME_ZONE.
3, Unsupported types(MySQL): 
org.apache.flink.connector.jdbc.dialect.MySQLDialect#unsupportedTypes provide 
the unsuppoted types.
4,  Unsupported types(Postgres): 
org.apache.flink.connector.jdbc.dialect.PostgresDialect#unsupportedTypes 
provide the unsuppoted types.
5,  (Postgres) parts of types implements referenced from Postgres 
https://www.postgresql.org/docs/12/datatype.html .6,  (MySQL) parts of types 
implements referenced from MySQL 
https://dev.mysql.com/doc/refman/8.0/en/data-types.html.


Please let me If you have any suggestion.


--
发件人:Jark Wu 
发送时间:2021年6月23日(星期三) 23:13
收件人:dev ; 云华 
主 题:Re: [DISCUSS] [FLINK-23122] Provide the Dynamic register converter

Hi,

`TIMESTAMP_WITH_TIME_ZONE` is not supported in the Flink SQL engine,
  even though it is listed in the type API.

I think what you are looking for is the RawValueType which can be used as
user-defined type. You can use `DataTypes.RAW(TypeInformation)` to define
  a Raw type with the given TypeInformation which includes the serializer
and deserializer.

Best,
Jark
On Wed, 23 Jun 2021 at 21:09, 云华  wrote:

  Hi everyone,
  I want to rework type conversion system in connector and flink table module 
to be resuable and scalability.
  I Postgres system, the type '_citext' will not supported in 
org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType.  what's 
more, 
org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal 
cannnot support the TIMESTAMP_WITH_TIME_ZONE.
  For more background and api design : 
https://issues.apache.org/jira/browse/FLINK-23122.
  Please let me know if this matches your thoughts.



  Regards,Jack





[DISCUSS] Incrementally deprecating the DataSet API

2021-06-23 Thread Timo Walther

Hi everyone,

I'm sending this email to make sure everyone is on the same page about 
slowly deprecating the DataSet API.


There have been a few thoughts mentioned in presentations, offline 
discussions, and JIRA issues. However, I have observed that there are 
still some concerns or different opinions on what steps are necessary to 
implement this change.


Let me summarize some of the steps and assumpations and let's have a 
discussion about it:


Step 1: Introduce a batch mode for Table API (FLIP-32)
[DONE in 1.9]

Step 2: Introduce a batch mode for DataStream API (FLIP-134)
[DONE in 1.12]

Step 3: Soft deprecate DataSet API (FLIP-131)
[DONE in 1.12]

We updated the documentation recently to make this deprecation even more 
visible. There is a dedicated `(Legacy)` label right next to the menu 
item now.


We won't deprecate concrete classes of the API with a @Deprecated 
annotation to avoid extensive warnings in logs until then.


Step 4: Drop the legacy SQL connectors and formats (FLINK-14437)
[DONE in 1.14]

We dropped code for ORC, Parque, and HBase formats that were only used 
by DataSet API users. The removed classes had no documentation and were 
not annotated with one of our API stability annotations.


The old functionality should be available through the new sources and 
sinks for Table API and DataStream API. If not, we should bring them 
into a shape that they can be a full replacement.


DataSet users are encouraged to either upgrade the API or use Flink 
1.13. Users can either just stay at Flink 1.13 or copy only the format's 
code to a newer Flink version. We aim to keep the core interfaces (i.e. 
InputFormat and OutputFormat) stable until the next major version.


We will maintain/allow important contributions to dropped connectors in 
1.13. So 1.13 could be considered as kind of a DataSet API LTS release.


Step 5: Drop the legacy SQL planner (FLINK-14437)
[DONE in 1.14]

This included dropping support of DataSet API with SQL.

Step 6: Connect both Table and DataStream API in batch mode (FLINK-20897)
[PLANNED in 1.14]

Step 7: Reach feature parity of Table API/DataStream API with DataSet API
[PLANNED for 1.14++]

We need to identify blockers when migrating from DataSet API to Table 
API/DataStream API. Here we need to estabilish a good feedback pipeline 
to include DataSet users in the roadmap planning.


Step 7: Drop the Gelly library

No concrete plan yet. Latest would be the next major Flink version aka 
Flink 2.0.


Step 8: Drop DataSet API

Planned for the next major Flink version aka Flink 2.0.


Please let me know if this matches your thoughts. We can also convert 
this into a blog post or mention it in the next release notes.


Regards,
Timo



[jira] [Created] (FLINK-23116) Update documentation about TableDescriptors

2021-06-23 Thread Timo Walther (Jira)
Timo Walther created FLINK-23116:


 Summary: Update documentation about TableDescriptors
 Key: FLINK-23116
 URL: https://issues.apache.org/jira/browse/FLINK-23116
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Table SQL / API
Reporter: Timo Walther


We should update the documentation at a couple of places to show different use 
cases. In any case we need a detailed documentation for the Table API/common 
API section.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-129 Register sources/sinks in Table API

2021-06-21 Thread Timo Walther

+1 (binding)

Thanks for driving this.

Regards,
Timo

On 21.06.21 13:24, Ingo Bürk wrote:

Hi everyone,

thanks for all the feedback so far. Based on the discussion[1] we seem to
have consensus, so I would like to start a vote on FLIP-129 for which the
FLIP has now also been updated[2].

The vote will last for at least 72 hours (Thu, Jun 24th 12:00 GMT) unless
there is an objection or insufficient votes.


Thanks
Ingo

[1]
https://lists.apache.org/thread.html/rc75d64e889bf35592e9843dde86e82bdfea8fd4eb4c3df150112b305%40%3Cdev.flink.apache.org%3E
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API





[jira] [Created] (FLINK-23072) Add benchmarks for SQL internal and external serializers

2021-06-21 Thread Timo Walther (Jira)
Timo Walther created FLINK-23072:


 Summary: Add benchmarks for SQL internal and external serializers
 Key: FLINK-23072
 URL: https://issues.apache.org/jira/browse/FLINK-23072
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Timo Walther


Currently, we don't benchmark any of the serializers of the SQL layer.

We should test {{RowData}} with at least a field of each logical type.

Also {{ExternalTypeInfo}} might be interesting to monitor because it is used 
between Table API and DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23040) Consider ConfigOption fallback keys in FactoryUtil

2021-06-18 Thread Timo Walther (Jira)
Timo Walther created FLINK-23040:


 Summary: Consider ConfigOption fallback keys in FactoryUtil
 Key: FLINK-23040
 URL: https://issues.apache.org/jira/browse/FLINK-23040
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


We are currently not taking fallback keys into consideration when performing 
validation in FactoryUtil.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23019) Avoid errors when identifiers use reserved keywords

2021-06-17 Thread Timo Walther (Jira)
Timo Walther created FLINK-23019:


 Summary: Avoid errors when identifiers use reserved keywords
 Key: FLINK-23019
 URL: https://issues.apache.org/jira/browse/FLINK-23019
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Timo Walther


We add more and more keywords and built-in functions with special meaning to 
SQL. However, this could be quite annoying for users that have columns named 
like a keyword. E.g. {{timestamp}} or {{current_timestamp}}.

We should investigate if we can do better and avoid forcing escaping with 
backticks. IIRC  Calcite also offers functionalities for that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-129 (Update): Registering sources/sinks on Table API without SQL

2021-06-10 Thread Timo Walther

Hi Ingo,

thanks for giving FLIP-129 an update before finally implementing it. I 
wouldn't start with a voting thread right away but collect more feedback 
from the community in a [DISCUSS] thread before.


Also, voting threads should be performed on the updated wiki page and 
include the voting deadline.


Regards,
Timo


On 10.06.21 12:02, Ingo Bürk wrote:

Hello everyone,

we would like to pick up work on FLIP-129 which aims to improve the Table
API by supporting the creation of sources / sinks without having to go
through SQL/DDL. This FLIP was approved a while ago, and some things have
changed since then. We'd like to propose a few changes, see [1], before
starting work on it.
Our proposal is mainly motivated by reducing the scope in some parts to
improve maintainability and relying more on ConfigOptions being the single
source of truth. We also want to expose this functionality for
non-temporary tables.

If anyone has further input please let us know and discuss. Otherwise we
wouldn't mind collecting a couple +1s. :-)

[1]
https://docs.google.com/document/d/1tpirvF0u723QF005UrgdbvF-Tp0Jbg_qhlbda4mk7Ck/edit?usp=sharing


Regards
Ingo





[jira] [Created] (FLINK-22916) Revisit and close JIRA issues around legacy planner

2021-06-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-22916:


 Summary: Revisit and close JIRA issues around legacy planner
 Key: FLINK-22916
 URL: https://issues.apache.org/jira/browse/FLINK-22916
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


We should review which legacy planner issues should be migrated to the Blink 
planner and which ones can simply be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22880) Remove "blink" term in code base

2021-06-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-22880:


 Summary: Remove "blink" term in code base
 Key: FLINK-22880
 URL: https://issues.apache.org/jira/browse/FLINK-22880
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Apart from FLINK-22879 and some API parts (such as EnvironmentSettings and old 
SQL Client YAML), we should not use the term "blink" in the code base and 
documentation anymore. For giving some background information, we should only 
document that the current planner was called "blink planner" in the past.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    1   2   3   4   5   6   7   8   9   10   >