[jira] [Created] (FLINK-29369) Commit delete file failure due to Checkpoint aborted

2022-09-20 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-29369:


 Summary: Commit delete file failure due to Checkpoint aborted
 Key: FLINK-29369
 URL: https://issues.apache.org/jira/browse/FLINK-29369
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.3.0, table-store-0.2.1


After checkpoint abort, the files in cp5 may fall into cp6, because the 
compaction commit is deleted first and then added, which may lead to:
-Delete a file
-Add the same file again

This causes the deleted file not to be found.

We need to properly process the merge of the compression files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29368) Modify DESCRIBE statement docs for now syntax

2022-09-20 Thread Yunhong Zheng (Jira)
Yunhong Zheng created FLINK-29368:
-

 Summary: Modify DESCRIBE statement docs for now syntax
 Key: FLINK-29368
 URL: https://issues.apache.org/jira/browse/FLINK-29368
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.0
Reporter: Yunhong Zheng
 Fix For: 1.17.0


In Flink 1.17.0, DESCRIBE statement syntax will be changed to DESCRIBE/DESC 
[EXTENDED] [catalog_name.][database_name.]table_name 
[PARTITION(partition_spec)] [col_name]. So, it need to modify the docs for this 
statement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29367) Avoid manifest corruption for incorrect checkpoint recovery

2022-09-20 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-29367:


 Summary: Avoid manifest corruption for incorrect checkpoint 
recovery
 Key: FLINK-29367
 URL: https://issues.apache.org/jira/browse/FLINK-29367
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Caizhi Weng
 Fix For: table-store-0.3.0


When the job runs to checkpoint N, if the user recovers from an old checkpoint 
(such as checkpoint N-5), the sink of the current FTS will cause a manifest 
corruption because duplicate files may be committed.

We should avoid such corruption, and the storage should be robust enough.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29366) Use flink shaded jacson to parse flink-conf.yaml

2022-09-20 Thread Yuan Kui (Jira)
Yuan Kui created FLINK-29366:


 Summary: Use flink shaded jacson to parse flink-conf.yaml
 Key: FLINK-29366
 URL: https://issues.apache.org/jira/browse/FLINK-29366
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.13.3
Reporter: Yuan Kui


Now we use a simple 
implementation(org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource)
 to parse flink-conf.yaml, which can only parse key-value pairs.

Although there have been discussions on this issue 
historically(see:https://github.com/stratosphere/stratosphere/issues/113)
but I think that in the actual production environment, we often need to config 
complex structure into flink-conf.yaml. At this time, the yaml libary is 
required for parsing, so I suggest to use the yaml library to parse 
flink-conf.yaml  instead of our own implementation.

In fact, the flink-core module already has a dependency on flink-shaded-jackson 
which could parse yaml format,  we can use this jar without more dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29365) Millisecond behind latest jumps after Flink 1.15.2 upgrade

2022-09-20 Thread Wilson Wu (Jira)
Wilson Wu created FLINK-29365:
-

 Summary: Millisecond behind latest jumps after Flink 1.15.2 upgrade
 Key: FLINK-29365
 URL: https://issues.apache.org/jira/browse/FLINK-29365
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.15.2
 Environment: Redeployment from 1.14.4 to 1.15.2
Reporter: Wilson Wu
 Attachments: Screen Shot 2022-09-19 at 2.50.56 PM.png

(First time filling a ticket in Flink community, please let me know if there 
are any guidelines I need to follow)

I noticed a very strange behavior with a recent version bump from Flink 1.14.4 
to 1.15.2. My project consumes around 30K records per second from a sharded 
kinesis stream, and during the version upgrade, it will follow the best 
practice to first trigger a savepoint from the running job, start the new job 
from the savepoint and then remove the old job. So far so good, and the above 
logic has been tested multiple times without any issue for 1.14.4. Usually, 
after the version upgrade, our job will have a few minutes delay for 
millisecond behind latest, but it will catch up with the speed quickly(within 
30mins). Our savepoint is around one hundred MBs big, and our job DAG will 
become 90 - 100% busy with some backpressure when we redeploy but after 10-20 
minutes it goes back to normal.

Then the strange thing happened, when I tried to redeploy with 1.15.2 upgrade 
from a running 1.14.4 job, I can see a savepoint has been created and the new 
job is running, all the metrics look fine, except suddenly [millisecond behind 
the 
latest|https://flink.apache.org/news/2019/02/25/monitoring-best-practices.html] 
jumps to 10 hours!! and it takes days for my application to catch up with the 
kinesis stream latest record. I don't understand why it jumps from 0 second to 
10+ hours when we restart the new job. The only main change I introduced with 
version bump is to change 
[failOnError|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.html]
 from true to false, but I don't think this is the root cause.

I tried to redeploy the new 1.15.2 job by changing our parallelism, redeploying 
a job from 1.15.2 does not introduce a big delay, so I assume the issue above 
only happens when we bump version from 1.14.4 to 1.15.2(note the attached 
screenshot)? I did try to bump it twice and I see the same 10hrs+ jump in 
delay, we do not have changes related to any timezones.

Please let me know if this can be filled as a bug, as I do not have a running 
project with all the kinesis setup available that can reproduce the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29364) Root cause of Exceptions thrown in the SourceReader start() method gets "swallowed".

2022-09-20 Thread Alexander Fedulov (Jira)
Alexander Fedulov created FLINK-29364:
-

 Summary: Root cause of Exceptions thrown in the SourceReader 
start() method gets "swallowed".
 Key: FLINK-29364
 URL: https://issues.apache.org/jira/browse/FLINK-29364
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.15.2
Reporter: Alexander Fedulov


If an exception is thrown in the {_}SourceReader{_}'s _start()_ method, its 
root cause does not get captured.

The details are still available here: 
[Task.java#L758|https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758]

But the execution falls through to 
[Task.java#L780|https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780]
  and discards the root cause of
canceling the source invokable without recording the actual reason.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29363) Allow web ui to fully redirect to other page

2022-09-20 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-29363:
-

 Summary: Allow web ui to fully redirect to other page
 Key: FLINK-29363
 URL: https://issues.apache.org/jira/browse/FLINK-29363
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.15.2
Reporter: Zhenqiu Huang


In a streaming platform system, web ui usually integrates with internal 
authentication and authorization system. Given the validation failed, the 
request needs to be redirected to a landing page. It does't work for AJAX 
request. It will be great to have the web ui configurable to allow auto full 
redirect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29362) Allow loading dynamic config for kerberos authentication in CliFrontend

2022-09-20 Thread Biao Geng (Jira)
Biao Geng created FLINK-29362:
-

 Summary: Allow loading dynamic config for kerberos authentication 
in CliFrontend
 Key: FLINK-29362
 URL: https://issues.apache.org/jira/browse/FLINK-29362
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client
Reporter: Biao Geng


In the 
[code|https://github.com/apache/flink/blob/97f5a45cd035fbae37a7468c6f771451ddb4a0a4/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1167],
 Flink's client will try to {{SecurityUtils.install(new 
SecurityConfiguration(cli.configuration));}} with configs(e.g. 
{{security.kerberos.login.principal}} and {{security.kerberos.login.keytab}}) 
from only flink-conf.yaml.
If users specify the above 2 config via -D option, it will not work as 
{{cli.parseAndRun(args)}} will be executed after installing security configs 
from flink-conf.yaml.
However, if a user specify principal A in client's flink-conf.yaml and use -D 
option to specify principal B, the launched YARN container will use principal B 
though the job is submitted in client end with principal A.

Such behavior can be misleading as Flink provides 2 ways to set a config but 
does not keep consistency between client and cluster. It also influence users 
who want use flink with kerberos as they must modify flink-conf.yaml if they 
want to use another kerberos user.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Kubernetes Operator 1.2.0 release

2022-09-20 Thread Márton Balassi
Thanks, +1 for the schedule.

On Tue, Sep 20, 2022 at 6:20 AM Yang Wang  wrote:

> Thanks Gyula for managing the release.
>
> +1 for the time schedule.
>
>
> Best,
> Yang
>
>
>
> Őrhidi Mátyás  于2022年9月19日周一 22:28写道:
>
> > Thanks Gyula!
> >
> > Sounds good! Happy to help as always.
> >
> > Cheers,
> > Matyas
> >
> > On Mon, Sep 19, 2022 at 1:37 PM Gyula Fóra  wrote:
> >
> > > Hi Devs!
> > >
> > > The originally planned (October 1) release date for 1.2.0 is fast
> > > approaching and we are already slightly behind schedule. There are a
> > couple
> > > outstanding bug tickets with 2 known blockers at the moment that should
> > be
> > > fixed in the next few days.
> > >
> > > As we are not aware of any larger critical issues or outstanding
> > features I
> > > propose the following adjusted release schedule:
> > >
> > >
> > > *Feature Freeze: September 23Release Branch Cut & RC1: September 28*
> > >
> > > Hopefully then we can finalize the release somewhere in the first week
> of
> > > October :)
> > >
> > > I volunteer as the release manager.
> > >
> > > Cheers,
> > > Gyula
> > >
> >
>


[jira] [Created] (FLINK-29361) How to set headers with the new Flink KafkaSink

2022-09-20 Thread Xin Hao (Jira)
Xin Hao created FLINK-29361:
---

 Summary: How to set headers with the new Flink KafkaSink
 Key: FLINK-29361
 URL: https://issues.apache.org/jira/browse/FLINK-29361
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Xin Hao


I'm using Flink 1.15.2, when I try to migrate to the new KafkaSink, it seems 
that it's not possible to add Kafka record headers.

I think we should add this feature or document it if we already have it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[SUMMARY] Flink 1.16 release sync of 2022-09-20

2022-09-20 Thread Xingbo Huang
I would like to give you a brief update of the Flink 1.16 release sync
meating of 2022-09-20.

*We have created release-1.16 rc0 in last Wednesday(14th of September 2022)*

*Currently, there are 3 blocker issues which are being worked on. Many
thanks to these contributors and reviewers.
- FLINK-29219, FLINK-29274, FLINK-29315

*We still have some critical test stabilities[1] need to be resolved*

*We are preparing release notes[2] and release announcement[3]*

For more information about Flink release 1.16, you can refer to
https://cwiki.apache.org/confluence/display/FLINK/1.16+Release

The next Flink release sync will be on Tuesday the 27th of September at 9am
CEST/ 3pm China Standard Time / 7am UTC. The link could be found on the
following page
https://cwiki.apache.org/confluence/display/FLINK/1.16+Release#id-1.16Release-Syncmeeting
.

On behalf of all the release managers,

best regards,
Xingbo

[1]
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20priority%20in%20(Blocker%2C%20Critical)%20AND%20fixVersion%20%3D%201.16.0%20ORDER%20BY%20summary%20ASC%2C%20priority%20DESC
[2] https://github.com/apache/flink/pull/20859
[3]
https://docs.google.com/document/d/1rIBNpzJulqEKJCuYtWtG-vDmSsGpN9sip_ewpzMequ0/edit#


[jira] [Created] (FLINK-29358) Pulsar Table Connector testing

2022-09-20 Thread Yufei Zhang (Jira)
Yufei Zhang created FLINK-29358:
---

 Summary: Pulsar Table Connector testing
 Key: FLINK-29358
 URL: https://issues.apache.org/jira/browse/FLINK-29358
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Affects Versions: 1.17.0
Reporter: Yufei Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29359) Pulsar Table Connector pom config and packaging

2022-09-20 Thread Yufei Zhang (Jira)
Yufei Zhang created FLINK-29359:
---

 Summary: Pulsar Table Connector pom config and packaging
 Key: FLINK-29359
 URL: https://issues.apache.org/jira/browse/FLINK-29359
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Affects Versions: 1.17.0
Reporter: Yufei Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29360) Pulsar Table Connector Documentation

2022-09-20 Thread Yufei Zhang (Jira)
Yufei Zhang created FLINK-29360:
---

 Summary: Pulsar Table Connector Documentation
 Key: FLINK-29360
 URL: https://issues.apache.org/jira/browse/FLINK-29360
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Affects Versions: 1.17.0
Reporter: Yufei Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29356) Pulsar Table Source code :implementation

2022-09-20 Thread Yufei Zhang (Jira)
Yufei Zhang created FLINK-29356:
---

 Summary: Pulsar Table Source code :implementation
 Key: FLINK-29356
 URL: https://issues.apache.org/jira/browse/FLINK-29356
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Affects Versions: 1.17.0
Reporter: Yufei Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29357) Pulsar Table Sink code: implementation

2022-09-20 Thread Yufei Zhang (Jira)
Yufei Zhang created FLINK-29357:
---

 Summary: Pulsar Table Sink code: implementation
 Key: FLINK-29357
 URL: https://issues.apache.org/jira/browse/FLINK-29357
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Affects Versions: 1.17.0
Reporter: Yufei Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Externalized connector release details

2022-09-20 Thread Chesnay Schepler

> After 1.16, only patches are accepted for 1.2.0-1.15.

I feel like this is a misunderstanding that both you and Danny ran into.

What I meant in the original proposal is that the last 2 _major_ 
/connector /versions are supported, with the latest receiving additional 
features.
(Provided that the previous major version still works against a 
currently supported Flink version!)
There will never be patch releases for a minor version if a newer minor 
version exists.


IOW, the minor/patch releases within a major version do not form a tree 
(like in Flink), but a line.


1.0.0 -> 1.0.1 -> 1.1.0 -> 1.2.0 -> ...
NOT
1.0.0 -> 1.0.1 -> 1.0.2
   |-> 1.1.0 -> 1.1.1

If we actually follow semantic versioning then it's just not necessary 
to publish a patch for a previous version.


So if 2.x exists, then (the latest) 2.x gets features and patches, and 
the latest 1.x gets patches.


I hope that clears things up.

On 20/09/2022 14:00, Jing Ge wrote:

Hi,

Thanks for starting this discussion. It is an interesting one and yeah, it
is a tough topic. It seems like a centralized release version schema
control for decentralized connector development ;-)

In general, I like this idea, not because it is a good one but because
there might be no better one(That's life!). The solution gives users an
easy life with the price of extra effort on the developer's part. But it is
a chicken and egg situation, i.e. developer friendly vs. user friendly. If
it is hard for developers to move forward, it will also be difficult for
users to get a new release, even if the version schema is user friendly.

I'd like to raise some questions/concerns to make sure we are on the same
page.

@Chesnay

c1) Imagine we have 2.0.0 for 1.15:

- 2.0.0-1.14 (patches)
- 2.0.0-1.15 (feature and patches)
===> new major release targeting 1.16 and we need to change code for new API
- 2.0.0-1.14 (no support)
- 2.0.0-1.15 (patches)
- 2.0.1-1.15 (new patches)
- 2.1.0-1.16 (feature and patches)

There is no more 2.1.0-1.15 because only the latest version is receiving
new features.

b1) Even if in some special cases that we need to break the rule, we should
avoid confusing users:

===> new major release targeting 1.16 and we need to change code for new API
- 2.0.0-1.14 (no support)
- 2.0.0-1.15 (patches)
- 2.1.0-1.16 (feature and patches)
===> now we want to break the rule to add features to the penultimate
version
- 2.0.0-1.14 (no support)
- 2.0.0-1.15 (patches)
 - 2.2.0-1.15 (patches, new features)  // 2.1.0-1.15 vs. 2.2.0-1.15,
have to choose 2.2.0-1.15 to avoid conflict
- 2.1.0-1.16 (feature and patches)

we have two options: 2.1.0-1.15 vs. 2.2.0-1.15, both will confuse users:
- Using 2.1.0-1.15 will conflict with the existing 2.1.0-1.16. The
connector version of "2.1.0-1.16" is actually 2.1.0 which means it has the
same code as 2.1.0-1.15 but in this case, it contains upgraded code.
- Using 2.2.0-1.15 will skip 2.1.0-1.15. Actually, it needs to skip all
occupied minor-1.16 versions, heads-up release manager!

c2) Allow me using your example:

===> new major release targeting 1.16
- 1.2.0-1.14 (no support; unsupported Flink version)
- 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever
happens first)
- 2.0.0-1.15 (feature and patches)
- 2.0.0-1.16 (feature and patches)

I didn't understand the part of "2.0.0-1.15 (features and patches)". After
1.16, only patches are accepted for 1.2.0-1.15.
It should be clearly defined how to bump up the connector's version number
for the new Flink version. If the connector major number would always bump
up, it would make less sense to use the Flink version as postfix. With the
same example, it should be:

===> new major release targeting 1.16
- 1.2.0-1.14 (no support; unsupported Flink version)
- 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever
happens first)
  - 1.2.1-1.15 (new patches)
- 1.3.0-1.16 (feature and patches)
 - 1.4.0-1.16 (feature and patches, new features)
 - 2.0.0-1.16 (feature and patches, major upgrade of connector itself)

or

- 1.2.0-1.14 (patches)
- 1.2.0-1.15 (feature and patches)
 - 2.0.0 -1.15 (feature and patches, major upgrade of connector itself)
===> new major release targeting 1.16
- 1.2.0-1.14 (no support; unsupported Flink version)
- 2.0.0-1.15 (patches)
 - 2.0.1-1.15 (new patches)
- 2.1.0-1.16 (feature and patches)
- 2.2.0-1.16 (feature and patches, new features)

i.e. commonly, there should be no connector major version change when using
the Flink version postfix as the version schema. Special cases(rarely
happens) are obviously allowed.

Best regards,
Jing

On Tue, Sep 20, 2022 at 10:57 AM Martijn Visser
wrote:


Hi all,

This is a tough topic, I also had to write things down a couple of times.
To summarize and add my thoughts:

a) I think everyone is agreeing that "Only the last 2 versions of a
connector are supported per supported Flink version, with only the latest
version receiving new features". In the current 

[jira] [Created] (FLINK-29355) Sql parse failed because of Desc catalog.database.table is incorrectly parsed to desc catalog

2022-09-20 Thread Yunhong Zheng (Jira)
Yunhong Zheng created FLINK-29355:
-

 Summary: Sql parse failed because of Desc catalog.database.table 
is incorrectly parsed to desc catalog 
 Key: FLINK-29355
 URL: https://issues.apache.org/jira/browse/FLINK-29355
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.0
Reporter: Yunhong Zheng
 Fix For: 1.17.0


If user names the CATALOG he uses as ‘catalog’, and he tries to desc table 
using syntax 'describe catalog.testDatabase.testTable'. This statement will be 
incorrectly parsed to 'DESC CATALOG' instead of 'DESC TABLE' .

!image-2022-09-20-20-00-19-478.png|width=592,height=187!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Externalized connector release details

2022-09-20 Thread Jing Ge
Hi,

Thanks for starting this discussion. It is an interesting one and yeah, it
is a tough topic. It seems like a centralized release version schema
control for decentralized connector development ;-)

In general, I like this idea, not because it is a good one but because
there might be no better one(That's life!). The solution gives users an
easy life with the price of extra effort on the developer's part. But it is
a chicken and egg situation, i.e. developer friendly vs. user friendly. If
it is hard for developers to move forward, it will also be difficult for
users to get a new release, even if the version schema is user friendly.

I'd like to raise some questions/concerns to make sure we are on the same
page.

@Chesnay

c1) Imagine we have 2.0.0 for 1.15:

- 2.0.0-1.14 (patches)
- 2.0.0-1.15 (feature and patches)
===> new major release targeting 1.16 and we need to change code for new API
- 2.0.0-1.14 (no support)
- 2.0.0-1.15 (patches)
   - 2.0.1-1.15 (new patches)
- 2.1.0-1.16 (feature and patches)

There is no more 2.1.0-1.15 because only the latest version is receiving
new features.

b1) Even if in some special cases that we need to break the rule, we should
avoid confusing users:

===> new major release targeting 1.16 and we need to change code for new API
- 2.0.0-1.14 (no support)
- 2.0.0-1.15 (patches)
- 2.1.0-1.16 (feature and patches)
===> now we want to break the rule to add features to the penultimate
version
- 2.0.0-1.14 (no support)
- 2.0.0-1.15 (patches)
- 2.2.0-1.15 (patches, new features)  // 2.1.0-1.15 vs. 2.2.0-1.15,
have to choose 2.2.0-1.15 to avoid conflict
- 2.1.0-1.16 (feature and patches)

we have two options: 2.1.0-1.15 vs. 2.2.0-1.15, both will confuse users:
- Using 2.1.0-1.15 will conflict with the existing 2.1.0-1.16. The
connector version of "2.1.0-1.16" is actually 2.1.0 which means it has the
same code as 2.1.0-1.15 but in this case, it contains upgraded code.
- Using 2.2.0-1.15 will skip 2.1.0-1.15. Actually, it needs to skip all
occupied minor-1.16 versions, heads-up release manager!

c2) Allow me using your example:

===> new major release targeting 1.16
- 1.2.0-1.14 (no support; unsupported Flink version)
- 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever
happens first)
- 2.0.0-1.15 (feature and patches)
- 2.0.0-1.16 (feature and patches)

I didn't understand the part of "2.0.0-1.15 (features and patches)". After
1.16, only patches are accepted for 1.2.0-1.15.
It should be clearly defined how to bump up the connector's version number
for the new Flink version. If the connector major number would always bump
up, it would make less sense to use the Flink version as postfix. With the
same example, it should be:

===> new major release targeting 1.16
- 1.2.0-1.14 (no support; unsupported Flink version)
- 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever
happens first)
 - 1.2.1-1.15 (new patches)
- 1.3.0-1.16 (feature and patches)
- 1.4.0-1.16 (feature and patches, new features)
- 2.0.0-1.16 (feature and patches, major upgrade of connector itself)

or

- 1.2.0-1.14 (patches)
- 1.2.0-1.15 (feature and patches)
- 2.0.0 -1.15 (feature and patches, major upgrade of connector itself)
===> new major release targeting 1.16
- 1.2.0-1.14 (no support; unsupported Flink version)
- 2.0.0-1.15 (patches)
- 2.0.1-1.15 (new patches)
- 2.1.0-1.16 (feature and patches)
   - 2.2.0-1.16 (feature and patches, new features)

i.e. commonly, there should be no connector major version change when using
the Flink version postfix as the version schema. Special cases(rarely
happens) are obviously allowed.

Best regards,
Jing

On Tue, Sep 20, 2022 at 10:57 AM Martijn Visser 
wrote:

> Hi all,
>
> This is a tough topic, I also had to write things down a couple of times.
> To summarize and add my thoughts:
>
> a) I think everyone is agreeing that "Only the last 2 versions of a
> connector are supported per supported Flink version, with only the latest
> version receiving new features". In the current situation, that means that
> Flink 1.14 and Flink 1.15 would be supported for connectors. This results
> in a maximum of 4 supported connector versions.
>
> b1) In an ideal world, I would have liked Flink's APIs that are used by
> connectors to be versioned (that's why there's now a Sink V1 and a Sink
> V2). However, we're not there yet.
>
> b2) With regards to the remark of using @Interal APIs, one thing that we
> agreed to in previous discussions is that connectors shouldn't need to rely
> on @Interal APIs so that the connector surface also stabilizes.
>
> b3) In the end, I think what matters the most is the user's perception on
> versioning. So the first thing to establish would be the versioning for
> connectors itself. So you would indeed have a  scheme.
> Next is the compatibility of that scheme with a version of Flink. I do like
> Chesnay's approach for using the Scala suffixes idea. So you would have
> _. In the currently
> externalized 

[jira] [Created] (FLINK-29354) Support TO_DATE and TO_TIMESTAMP built-in function in the Table API

2022-09-20 Thread Luning Wang (Jira)
Luning Wang created FLINK-29354:
---

 Summary: Support TO_DATE and TO_TIMESTAMP built-in function in the 
Table API
 Key: FLINK-29354
 URL: https://issues.apache.org/jira/browse/FLINK-29354
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.16.0
Reporter: Luning Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29353) Support UNIX_TIMESTAMP built-in function in Table API

2022-09-20 Thread Luning Wang (Jira)
Luning Wang created FLINK-29353:
---

 Summary: Support UNIX_TIMESTAMP built-in function in Table API
 Key: FLINK-29353
 URL: https://issues.apache.org/jira/browse/FLINK-29353
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Table SQL / API
Affects Versions: 1.16.0
Reporter: Luning Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29352) Support CONVERT_TZ built-in function in Table API

2022-09-20 Thread Luning Wang (Jira)
Luning Wang created FLINK-29352:
---

 Summary: Support CONVERT_TZ built-in function in Table API
 Key: FLINK-29352
 URL: https://issues.apache.org/jira/browse/FLINK-29352
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Table SQL / API
Affects Versions: 1.16.0
Reporter: Luning Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29351) Enable input buffer floating for blocking shuffle

2022-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-29351:
---

 Summary: Enable input buffer floating for blocking shuffle
 Key: FLINK-29351
 URL: https://issues.apache.org/jira/browse/FLINK-29351
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


At input gate, Flink needs exclusive buffers for each input channel. For large 
parallelism jobs, it is easy to cause "Insufficient number of network buffers" 
error. This ticket aims to make all input network buffers floating for blocking 
shuffle to reduce the possibility of "Insufficient number of network buffers" 
error. This change can also improve the default blocking shuffle performance 
because buffer floating can increase the buffer utilization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

2022-09-20 Thread Piotr Nowojski
Fine by me. Thanks for driving this Lincoln :)

Best, Piotrek

wt., 20 wrz 2022 o 09:06 Lincoln Lee  napisał(a):

> Hi all,
>I'll start a vote if there are no more objections till this
> thursday(9.22). Looking forward to your feedback!
>
> [1] Flip-260:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc
>
> Best,
> Lincoln Lee
>
>
> Lincoln Lee  于2022年9月19日周一 17:38写道:
>
> > Hi Jingsong,
> >Thank you for participating this discussion!  For the method name, I
> > think we should follow the new finish() method in `StreamOperator`,  the
> > BoundedOneInput might be removed in the future as discussed [1] before
> >
> > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Jingsong Li  于2022年9月19日周一 10:13写道:
> >
> >> +1 to add `finish()` method to `TableFunction` only.
> >>
> >> Can we use `endInput` just like `BoundedOneInput`?
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee 
> >> wrote:
> >> >
> >> > Hi Dawid, Piotr,
> >> >Agree with you that add finish() method to `TableFunction` only.
> >> Other
> >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
> >> > `AggregateTableFunction`) are not necessarily to have the finish
> >> > method(they can not emit records in legacy close() method).
> >> >
> >> > A `TableFunction` is used to correlate with the left table/stream, the
> >> > following example shows a case that user only select columns from the
> >> > correlated 'FeatureTF' (no left table column was selected):
> >> > ```
> >> > SELECT feature1, feature2, feature3
> >> > FROM MyTable t1
> >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
> >> > feature3) ON TRUE
> >> > ```
> >> > the 'FeatureTF' can do some flushing work in legacy close() method and
> >> this
> >> > doesn't break any sql semantics, so I don't see any reason that we can
> >> > enforce users not do flushing work in new finish() method. I've
> updated
> >> the
> >> > flip doc to limit the change only for `TableFunction`[1].
> >> >
> >> > For the more powerful `ProcessFunction`, I'd like to share some
> >> thoughts:
> >> > There indeed exists requirements for advanced usage in Table/SQL,
> even a
> >> > further UD-Operator, e.g., UD-Join for user controlled join logic
> which
> >> can
> >> > not simply expressed by SQL. This is an interesting topic, expect more
> >> > discussions on this.
> >> >
> >> >
> >> > [1]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> >> >
> >> > Best,
> >> > Lincoln Lee
> >> >
> >> >
> >> > Piotr Nowojski  于2022年9月15日周四 22:39写道:
> >> >
> >> > > Hi Dawid, Lincoln,
> >> > >
> >> > > I would tend to agree with Dawid. It seems to me like
> `TableFunction`
> >> is
> >> > > the one that needs to be taken care of. Other types of
> >> > > `UserDefinedFunction` wouldn't be able to emit anything from the
> >> `finish()`
> >> > > even if we added it. And if we added `finish(Collector out)` to
> >> them, it
> >> > > would create the same problems (how to pass the output type) that
> >> prevented
> >> > > us from adding `finish()` to all functions in the DataStream API.
> >> > >
> >> > > However I'm not sure what should be the long term solution for the
> >> Table
> >> > > API. For the DataStream API we wanted to provide a new, better and
> >> more
> >> > > powerful `ProcessFunction` for all of the unusual use cases, that
> >> currently
> >> > > require the use of `StreamOperator` API instead of `DataStream`
> >> functions.
> >> > > I don't know what would be an alternative in the Table API.
> >> > >
> >> > > Dawid, who do you think we should ping from the Table API/SQL teams
> >> to chip
> >> > > in?
> >> > >
> >> > > Best,
> >> > > Piotrek
> >> > >
> >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz 
> >> > > napisał(a):
> >> > >
> >> > > > Hey Lincoln,
> >> > > >
> >> > > > Thanks for opening the discussion.
> >> > > >
> >> > > > To be honest I am not convinced if emitting from close there is a
> >> > > > contract that was envisioned and thus should be maintained. As far
> >> as I
> >> > > > can see it does affect only the TableFunction, because it has the
> >> > > > collect method. None of the other UDFs (ScalarFunction,
> >> > > > AggregateFunction) have means to emit records from close().
> >> > > >
> >> > > > To be honest I am not sure what would be the consequences of
> >> interplay
> >> > > > with other operators which expect TableFunction to emit only when
> >> eval
> >> > > > is called. Not sure if there are such.
> >> > > >
> >> > > > If it is a thing that we are certain we want to support, I'd be
> much
> >> > > > more comfortable adding finish() to the TableFunction instead.
> >> Would be
> >> > > > happy to hear opinions from the Table API folks.
> >> > > >
> >> > > > Best,
> >> > > >
> 

Re: [DISCUSS] Externalized connector release details

2022-09-20 Thread Martijn Visser
Hi all,

This is a tough topic, I also had to write things down a couple of times.
To summarize and add my thoughts:

a) I think everyone is agreeing that "Only the last 2 versions of a
connector are supported per supported Flink version, with only the latest
version receiving new features". In the current situation, that means that
Flink 1.14 and Flink 1.15 would be supported for connectors. This results
in a maximum of 4 supported connector versions.

b1) In an ideal world, I would have liked Flink's APIs that are used by
connectors to be versioned (that's why there's now a Sink V1 and a Sink
V2). However, we're not there yet.

b2) With regards to the remark of using @Interal APIs, one thing that we
agreed to in previous discussions is that connectors shouldn't need to rely
on @Interal APIs so that the connector surface also stabilizes.

b3) In the end, I think what matters the most is the user's perception on
versioning. So the first thing to establish would be the versioning for
connectors itself. So you would indeed have a  scheme.
Next is the compatibility of that scheme with a version of Flink. I do like
Chesnay's approach for using the Scala suffixes idea. So you would have
_. In the currently
externalized Elasticsearch connector, we would end up with 3.0.0_1.14 and
3.0.0_1.15 as first released versions. If a new Flink version would be
released that doesn't require code changes to the connector, the released
version would be 3.0.0_1.16. That means that there have been no connector
code changes (no patches, no new features) when comparing this across
different Flink versions.

b4) Now using the example that Chesnay provided (yet slightly modified to
match it with the Elasticsearch example I've used above), there exists an
Elasticsearch connector 3.0.0_1.15. Now in Flink 1.16, there's a new API
that we want to use, which is a test util. It would result in version
3.1.0_1.16 for the new Flink version. Like Chesnay said, for the sake of
argument, at the same time we also had some pending changes for the 1.15
connector (let's say exclusive to 1.15; some workaround for a bug or smth),
so we would also end up with 3.1.0-1.15. I agree with Danny that we should
avoid this situation: the perception of the user would be that there's no
divergence between the 3.1.0 version, except the compatible Flink version.

I really am wondering how often we will run in that situation. From what
I've seen so far with connectors is that bug fixes always end up in both
the release branch and the master branch. The only exceptions are test
stabilities or documentation fixes, but if we only resolve these, they
wouldn't need to be released. If such a special occasion would occur, I
would be inclined to go for a hotfix approach, where you would end up with
3.0.0.1_1.15.

c) Branch wise, I think we should end up with _. So again the Elasticsearch example,
at this moment there would be 3.0.0_1.14 and 3.0.0_1.15 branches.

Best regards,

Martijn


Re: [DISCUSS] Externalized connector release details

2022-09-20 Thread Chesnay Schepler

c)
@Ryan:
I'm generally fine with leaving it up to the connector on how to 
implement Flink version-specific behavior, so long as the branching 
models stays consistent (primarily so that the release process is 
identical and we can share infrastructure).


@Danny:
In a single branch per version model we should update the version when 
making a change specific to a given Flink version.
That a change for the 1.16 code can affect the 1.15 version is a price 
I'm willing to pay.


If you use separate branches this becomes more complicated (surprise!), 
because then you likely end up with converging code-bases using the same 
version.

Because either
    a) you have 1.0.0 for both with diverging Flink version-specific code,
    b) create 1.1.0-1.16, but then what of 1.1.0-1.15? Do you skip 
1.1.0 for 1.15? That would just be strange.


c1-3)
I don't think we need to limit ourselves to only adding patches for 
Flink n-1.


In my mind it would progress like this:

- 1.0.0-1.14 (features & patches)
===> 1.15 is released, let's support it; adding some version-specific code
- 1.1.0-1.14 (feature and patches)
- 1.1.0-1.15 (feature and patches)
===> general improvements to the v1 connector
- 1.2.0-1.14 (feature and patches)
- 1.2.0-1.15 (feature and patches)
===> new major release targeting 1.16
- 1.2.0-1.14 (no support; unsupported Flink version)
- 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever happens 
first)
- 2.0.0-1.15 (feature and patches)
- 2.0.0-1.16 (feature and patches)

d)
Does the social aspect actually require a "main" branch, or just a 
_default_ branch on which development actually takes place?
Unless we develop a habit of creating a significant number of major 
releases per year, we could use the branch of the latest connector 
version as the default.


On 16/09/2022 20:03, Danny Cranmer wrote:

c) I am torn here. I do not like the idea that the connector code could
diverge for the same connector version, ie 2.1.0-1.15 and 2.1.0-1.16. If
the Flink version change requires a change to the connector code, then this
should result in a new connector version in my opinion. Going back to your
example of API changes, this means we could end up supporting new features
for multiple Flink versions with potentially different @Internal
implementations. Where do we draw the line here? Hypothetically if the
@Internal Flink apis change substantially, but do not impact the public
connector interface, this could still end up with the same connector
version. Example:

- 1.0.0-1.14 (uses @Internal connector API v1)
- 1.0.0-1.15 (uses @Internal connector API v2)

But on the flip side, this example is very good "let's say exclusive to
1.15; some workaround for a bug or smth", and I am not sure how the single
branch per version approach would solve it.

c1) We would likely need to do something like this:

- 1.0.0-1.14 (patches)
- 1.1.0-1.15 (contains some 1.15 bugfix) (feature and patches)

c2) When 1.16 is released, we would need to remove the 1.15 bugfix, and
therefore change the connector code:

- 1.0.0-1.14
- 1.1.0-1.15 (contains some 1.15 bugfix) (patches)
- 1.2.0-1.16 (features and patches)

c3) But following this approach I can see a case where we end up with a
version gap to support Flink (n-1) patches:

- 1.0.0-1.14
- 1.1.0-1.15 (contains some 1.15 bugfix) (patches)
- 1.2.0-1.16
- 1.3.0-1.16
- 1.4.0-1.16 (patches)
- 1.5.0-1.16 (features and patches)

I think we need some animations to help with these!

d) Agree with the "Socially" point generally.

Thanks,

On Fri, Sep 16, 2022 at 1:30 PM Ryan Skraba  wrote:


I had to write down a diagram to fully understand the discussion :D

If I recall correctly, during the externalization discussion, the "price
to pay" for the (many) advantages of taking connectors out of the main repo
was to maintain and manually consult a compatibility matrix per connector.
I'm not a fan of that approach, and your example of diverging code between
2.1.0-1.15 and 2.1.0-1.16 is a good reason why.

b) I think your proposal here is a viable alternative.

c) In my experience, the extra effort of correctly cherry-picking to the
"right" branches adds a small burden to each commit and release event.

The biggest challenge will be for committers for each connector to be
mindful of which versions are "still in the game" (but this is also true
for the compatibility matrix approach).  Two major versions of connectors
multiplied by two versions of Flink is up to three cherry-picks per commit
-- plus one if the connector is currently being migrated and exists
simultaneously inside and outside the main repo, plus another for the
previous still-supported version of flink.  It's going to take some
education effort!

Weighing in on the shim approach: this might be something to leave up to
each connector -- I can see it being easier or more relevant for some
connectors than others to use dedicated branches versus dedicated modules
per flink version, and this might evolve with the 

[jira] [Created] (FLINK-29350) Add a note for swapping planner jar in Hive dependencies page

2022-09-20 Thread luoyuxia (Jira)
luoyuxia created FLINK-29350:


 Summary: Add a note for swapping planner jar in Hive dependencies 
page
 Key: FLINK-29350
 URL: https://issues.apache.org/jira/browse/FLINK-29350
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive, Documentation
Affects Versions: 1.16.0
Reporter: luoyuxia
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29349) Use state ttl instead of timer to clean up state in proctime over aggregate

2022-09-20 Thread lincoln lee (Jira)
lincoln lee created FLINK-29349:
---

 Summary: Use state ttl instead of timer to clean up state in 
proctime over aggregate
 Key: FLINK-29349
 URL: https://issues.apache.org/jira/browse/FLINK-29349
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.15.2, 1.16.0
Reporter: lincoln lee
 Fix For: 1.17.0


Currently we rely on the timer based state cleaning  in proctime over 
aggregate, this can be optimized to use state ttl for a more efficienct way



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29348) The DPP(dynamic partition pruning) can not work with adaptive batch scheduler

2022-09-20 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-29348:
--

 Summary: The DPP(dynamic partition pruning) can not work with 
adaptive batch scheduler
 Key: FLINK-29348
 URL: https://issues.apache.org/jira/browse/FLINK-29348
 Project: Flink
  Issue Type: Bug
Reporter: Lijie Wang


When running tpcds with both DPP(dynamic partition pruning) and adaptive batch 
scheduler enabled, q14a.sql fails due to the following exception:
{code:java}
2022-09-20 10:34:18,244 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job q14a.sql 
(6d4355bdde514be083b9762e286626d2) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:125)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1031)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:588)
 ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at sun.reflect.GeneratedMethodAccessor69.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_332]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_332]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at akka.actor.Actor.aroundReceive(Actor.scala:537) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
at 

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

2022-09-20 Thread Lincoln Lee
Hi all,
   I'll start a vote if there are no more objections till this
thursday(9.22). Looking forward to your feedback!

[1] Flip-260:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
[2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc

Best,
Lincoln Lee


Lincoln Lee  于2022年9月19日周一 17:38写道:

> Hi Jingsong,
>Thank you for participating this discussion!  For the method name, I
> think we should follow the new finish() method in `StreamOperator`,  the
> BoundedOneInput might be removed in the future as discussed [1] before
>
> [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb
>
> Best,
> Lincoln Lee
>
>
> Jingsong Li  于2022年9月19日周一 10:13写道:
>
>> +1 to add `finish()` method to `TableFunction` only.
>>
>> Can we use `endInput` just like `BoundedOneInput`?
>>
>> Best,
>> Jingsong
>>
>> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee 
>> wrote:
>> >
>> > Hi Dawid, Piotr,
>> >Agree with you that add finish() method to `TableFunction` only.
>> Other
>> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
>> > `AggregateTableFunction`) are not necessarily to have the finish
>> > method(they can not emit records in legacy close() method).
>> >
>> > A `TableFunction` is used to correlate with the left table/stream, the
>> > following example shows a case that user only select columns from the
>> > correlated 'FeatureTF' (no left table column was selected):
>> > ```
>> > SELECT feature1, feature2, feature3
>> > FROM MyTable t1
>> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
>> > feature3) ON TRUE
>> > ```
>> > the 'FeatureTF' can do some flushing work in legacy close() method and
>> this
>> > doesn't break any sql semantics, so I don't see any reason that we can
>> > enforce users not do flushing work in new finish() method. I've updated
>> the
>> > flip doc to limit the change only for `TableFunction`[1].
>> >
>> > For the more powerful `ProcessFunction`, I'd like to share some
>> thoughts:
>> > There indeed exists requirements for advanced usage in Table/SQL, even a
>> > further UD-Operator, e.g., UD-Join for user controlled join logic which
>> can
>> > not simply expressed by SQL. This is an interesting topic, expect more
>> > discussions on this.
>> >
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
>> >
>> > Best,
>> > Lincoln Lee
>> >
>> >
>> > Piotr Nowojski  于2022年9月15日周四 22:39写道:
>> >
>> > > Hi Dawid, Lincoln,
>> > >
>> > > I would tend to agree with Dawid. It seems to me like `TableFunction`
>> is
>> > > the one that needs to be taken care of. Other types of
>> > > `UserDefinedFunction` wouldn't be able to emit anything from the
>> `finish()`
>> > > even if we added it. And if we added `finish(Collector out)` to
>> them, it
>> > > would create the same problems (how to pass the output type) that
>> prevented
>> > > us from adding `finish()` to all functions in the DataStream API.
>> > >
>> > > However I'm not sure what should be the long term solution for the
>> Table
>> > > API. For the DataStream API we wanted to provide a new, better and
>> more
>> > > powerful `ProcessFunction` for all of the unusual use cases, that
>> currently
>> > > require the use of `StreamOperator` API instead of `DataStream`
>> functions.
>> > > I don't know what would be an alternative in the Table API.
>> > >
>> > > Dawid, who do you think we should ping from the Table API/SQL teams
>> to chip
>> > > in?
>> > >
>> > > Best,
>> > > Piotrek
>> > >
>> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz 
>> > > napisał(a):
>> > >
>> > > > Hey Lincoln,
>> > > >
>> > > > Thanks for opening the discussion.
>> > > >
>> > > > To be honest I am not convinced if emitting from close there is a
>> > > > contract that was envisioned and thus should be maintained. As far
>> as I
>> > > > can see it does affect only the TableFunction, because it has the
>> > > > collect method. None of the other UDFs (ScalarFunction,
>> > > > AggregateFunction) have means to emit records from close().
>> > > >
>> > > > To be honest I am not sure what would be the consequences of
>> interplay
>> > > > with other operators which expect TableFunction to emit only when
>> eval
>> > > > is called. Not sure if there are such.
>> > > >
>> > > > If it is a thing that we are certain we want to support, I'd be much
>> > > > more comfortable adding finish() to the TableFunction instead.
>> Would be
>> > > > happy to hear opinions from the Table API folks.
>> > > >
>> > > > Best,
>> > > >
>> > > > Dawid
>> > > >
>> > > > On 14/09/2022 15:55, Lincoln Lee wrote:
>> > > > > Thanks @Piort for your valuable inputs!
>> > > > >
>> > > > > I did a quick read of the previous discussion you mentioned,
>> seems my
>> > > > flip
>> > > > > title doesn't give a clear scope here and make some confusions,
>> if my
>> > > > > understanding is correct, the UDFs in your context is the user
>> > > 

[jira] [Created] (FLINK-29347) Failed to restore from list state with empty protobuf object

2022-09-20 Thread shen (Jira)
shen created FLINK-29347:


 Summary: Failed to restore from list state with empty protobuf 
object
 Key: FLINK-29347
 URL: https://issues.apache.org/jira/browse/FLINK-29347
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0, 1.14.2
Reporter: shen


I use protobuf generated class in an union list state.
When my flink job restores from checkpoint, I get exception:
{code:java}
Caused by: java.lang.RuntimeException: Could not create class 
com.MY_PROTOBUF_GENERATED_CLASS
at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) 
~[my-lib-0.1.1-SNAPSHOT.jar:?] 
at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) 
~[my-lib-0.1.1-SNAPSHOT.jar:?] 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] 
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
more bytes left. 
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) 
~[flink-dist_2.12-1.14.4.jar:1.14.4] 
at