[jira] [Created] (FLINK-29369) Commit delete file failure due to Checkpoint aborted
Jingsong Lee created FLINK-29369: Summary: Commit delete file failure due to Checkpoint aborted Key: FLINK-29369 URL: https://issues.apache.org/jira/browse/FLINK-29369 Project: Flink Issue Type: Bug Components: Table Store Reporter: Jingsong Lee Fix For: table-store-0.3.0, table-store-0.2.1 After checkpoint abort, the files in cp5 may fall into cp6, because the compaction commit is deleted first and then added, which may lead to: -Delete a file -Add the same file again This causes the deleted file not to be found. We need to properly process the merge of the compression files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29368) Modify DESCRIBE statement docs for now syntax
Yunhong Zheng created FLINK-29368: - Summary: Modify DESCRIBE statement docs for now syntax Key: FLINK-29368 URL: https://issues.apache.org/jira/browse/FLINK-29368 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.17.0 Reporter: Yunhong Zheng Fix For: 1.17.0 In Flink 1.17.0, DESCRIBE statement syntax will be changed to DESCRIBE/DESC [EXTENDED] [catalog_name.][database_name.]table_name [PARTITION(partition_spec)] [col_name]. So, it need to modify the docs for this statement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29367) Avoid manifest corruption for incorrect checkpoint recovery
Jingsong Lee created FLINK-29367: Summary: Avoid manifest corruption for incorrect checkpoint recovery Key: FLINK-29367 URL: https://issues.apache.org/jira/browse/FLINK-29367 Project: Flink Issue Type: Bug Components: Table Store Reporter: Jingsong Lee Assignee: Caizhi Weng Fix For: table-store-0.3.0 When the job runs to checkpoint N, if the user recovers from an old checkpoint (such as checkpoint N-5), the sink of the current FTS will cause a manifest corruption because duplicate files may be committed. We should avoid such corruption, and the storage should be robust enough. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29366) Use flink shaded jacson to parse flink-conf.yaml
Yuan Kui created FLINK-29366: Summary: Use flink shaded jacson to parse flink-conf.yaml Key: FLINK-29366 URL: https://issues.apache.org/jira/browse/FLINK-29366 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.13.3 Reporter: Yuan Kui Now we use a simple implementation(org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource) to parse flink-conf.yaml, which can only parse key-value pairs. Although there have been discussions on this issue historically(see:https://github.com/stratosphere/stratosphere/issues/113) but I think that in the actual production environment, we often need to config complex structure into flink-conf.yaml. At this time, the yaml libary is required for parsing, so I suggest to use the yaml library to parse flink-conf.yaml instead of our own implementation. In fact, the flink-core module already has a dependency on flink-shaded-jackson which could parse yaml format, we can use this jar without more dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29365) Millisecond behind latest jumps after Flink 1.15.2 upgrade
Wilson Wu created FLINK-29365: - Summary: Millisecond behind latest jumps after Flink 1.15.2 upgrade Key: FLINK-29365 URL: https://issues.apache.org/jira/browse/FLINK-29365 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.15.2 Environment: Redeployment from 1.14.4 to 1.15.2 Reporter: Wilson Wu Attachments: Screen Shot 2022-09-19 at 2.50.56 PM.png (First time filling a ticket in Flink community, please let me know if there are any guidelines I need to follow) I noticed a very strange behavior with a recent version bump from Flink 1.14.4 to 1.15.2. My project consumes around 30K records per second from a sharded kinesis stream, and during the version upgrade, it will follow the best practice to first trigger a savepoint from the running job, start the new job from the savepoint and then remove the old job. So far so good, and the above logic has been tested multiple times without any issue for 1.14.4. Usually, after the version upgrade, our job will have a few minutes delay for millisecond behind latest, but it will catch up with the speed quickly(within 30mins). Our savepoint is around one hundred MBs big, and our job DAG will become 90 - 100% busy with some backpressure when we redeploy but after 10-20 minutes it goes back to normal. Then the strange thing happened, when I tried to redeploy with 1.15.2 upgrade from a running 1.14.4 job, I can see a savepoint has been created and the new job is running, all the metrics look fine, except suddenly [millisecond behind the latest|https://flink.apache.org/news/2019/02/25/monitoring-best-practices.html] jumps to 10 hours!! and it takes days for my application to catch up with the kinesis stream latest record. I don't understand why it jumps from 0 second to 10+ hours when we restart the new job. The only main change I introduced with version bump is to change [failOnError|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.html] from true to false, but I don't think this is the root cause. I tried to redeploy the new 1.15.2 job by changing our parallelism, redeploying a job from 1.15.2 does not introduce a big delay, so I assume the issue above only happens when we bump version from 1.14.4 to 1.15.2(note the attached screenshot)? I did try to bump it twice and I see the same 10hrs+ jump in delay, we do not have changes related to any timezones. Please let me know if this can be filled as a bug, as I do not have a running project with all the kinesis setup available that can reproduce the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29364) Root cause of Exceptions thrown in the SourceReader start() method gets "swallowed".
Alexander Fedulov created FLINK-29364: - Summary: Root cause of Exceptions thrown in the SourceReader start() method gets "swallowed". Key: FLINK-29364 URL: https://issues.apache.org/jira/browse/FLINK-29364 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.15.2 Reporter: Alexander Fedulov If an exception is thrown in the {_}SourceReader{_}'s _start()_ method, its root cause does not get captured. The details are still available here: [Task.java#L758|https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758] But the execution falls through to [Task.java#L780|https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780] and discards the root cause of canceling the source invokable without recording the actual reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29363) Allow web ui to fully redirect to other page
Zhenqiu Huang created FLINK-29363: - Summary: Allow web ui to fully redirect to other page Key: FLINK-29363 URL: https://issues.apache.org/jira/browse/FLINK-29363 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Affects Versions: 1.15.2 Reporter: Zhenqiu Huang In a streaming platform system, web ui usually integrates with internal authentication and authorization system. Given the validation failed, the request needs to be redirected to a landing page. It does't work for AJAX request. It will be great to have the web ui configurable to allow auto full redirect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29362) Allow loading dynamic config for kerberos authentication in CliFrontend
Biao Geng created FLINK-29362: - Summary: Allow loading dynamic config for kerberos authentication in CliFrontend Key: FLINK-29362 URL: https://issues.apache.org/jira/browse/FLINK-29362 Project: Flink Issue Type: Improvement Components: Command Line Client Reporter: Biao Geng In the [code|https://github.com/apache/flink/blob/97f5a45cd035fbae37a7468c6f771451ddb4a0a4/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1167], Flink's client will try to {{SecurityUtils.install(new SecurityConfiguration(cli.configuration));}} with configs(e.g. {{security.kerberos.login.principal}} and {{security.kerberos.login.keytab}}) from only flink-conf.yaml. If users specify the above 2 config via -D option, it will not work as {{cli.parseAndRun(args)}} will be executed after installing security configs from flink-conf.yaml. However, if a user specify principal A in client's flink-conf.yaml and use -D option to specify principal B, the launched YARN container will use principal B though the job is submitted in client end with principal A. Such behavior can be misleading as Flink provides 2 ways to set a config but does not keep consistency between client and cluster. It also influence users who want use flink with kerberos as they must modify flink-conf.yaml if they want to use another kerberos user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Kubernetes Operator 1.2.0 release
Thanks, +1 for the schedule. On Tue, Sep 20, 2022 at 6:20 AM Yang Wang wrote: > Thanks Gyula for managing the release. > > +1 for the time schedule. > > > Best, > Yang > > > > Őrhidi Mátyás 于2022年9月19日周一 22:28写道: > > > Thanks Gyula! > > > > Sounds good! Happy to help as always. > > > > Cheers, > > Matyas > > > > On Mon, Sep 19, 2022 at 1:37 PM Gyula Fóra wrote: > > > > > Hi Devs! > > > > > > The originally planned (October 1) release date for 1.2.0 is fast > > > approaching and we are already slightly behind schedule. There are a > > couple > > > outstanding bug tickets with 2 known blockers at the moment that should > > be > > > fixed in the next few days. > > > > > > As we are not aware of any larger critical issues or outstanding > > features I > > > propose the following adjusted release schedule: > > > > > > > > > *Feature Freeze: September 23Release Branch Cut & RC1: September 28* > > > > > > Hopefully then we can finalize the release somewhere in the first week > of > > > October :) > > > > > > I volunteer as the release manager. > > > > > > Cheers, > > > Gyula > > > > > >
[jira] [Created] (FLINK-29361) How to set headers with the new Flink KafkaSink
Xin Hao created FLINK-29361: --- Summary: How to set headers with the new Flink KafkaSink Key: FLINK-29361 URL: https://issues.apache.org/jira/browse/FLINK-29361 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Xin Hao I'm using Flink 1.15.2, when I try to migrate to the new KafkaSink, it seems that it's not possible to add Kafka record headers. I think we should add this feature or document it if we already have it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[SUMMARY] Flink 1.16 release sync of 2022-09-20
I would like to give you a brief update of the Flink 1.16 release sync meating of 2022-09-20. *We have created release-1.16 rc0 in last Wednesday(14th of September 2022)* *Currently, there are 3 blocker issues which are being worked on. Many thanks to these contributors and reviewers. - FLINK-29219, FLINK-29274, FLINK-29315 *We still have some critical test stabilities[1] need to be resolved* *We are preparing release notes[2] and release announcement[3]* For more information about Flink release 1.16, you can refer to https://cwiki.apache.org/confluence/display/FLINK/1.16+Release The next Flink release sync will be on Tuesday the 27th of September at 9am CEST/ 3pm China Standard Time / 7am UTC. The link could be found on the following page https://cwiki.apache.org/confluence/display/FLINK/1.16+Release#id-1.16Release-Syncmeeting . On behalf of all the release managers, best regards, Xingbo [1] https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20priority%20in%20(Blocker%2C%20Critical)%20AND%20fixVersion%20%3D%201.16.0%20ORDER%20BY%20summary%20ASC%2C%20priority%20DESC [2] https://github.com/apache/flink/pull/20859 [3] https://docs.google.com/document/d/1rIBNpzJulqEKJCuYtWtG-vDmSsGpN9sip_ewpzMequ0/edit#
[jira] [Created] (FLINK-29358) Pulsar Table Connector testing
Yufei Zhang created FLINK-29358: --- Summary: Pulsar Table Connector testing Key: FLINK-29358 URL: https://issues.apache.org/jira/browse/FLINK-29358 Project: Flink Issue Type: Sub-task Components: Connectors / Pulsar Affects Versions: 1.17.0 Reporter: Yufei Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29359) Pulsar Table Connector pom config and packaging
Yufei Zhang created FLINK-29359: --- Summary: Pulsar Table Connector pom config and packaging Key: FLINK-29359 URL: https://issues.apache.org/jira/browse/FLINK-29359 Project: Flink Issue Type: Sub-task Components: Connectors / Pulsar Affects Versions: 1.17.0 Reporter: Yufei Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29360) Pulsar Table Connector Documentation
Yufei Zhang created FLINK-29360: --- Summary: Pulsar Table Connector Documentation Key: FLINK-29360 URL: https://issues.apache.org/jira/browse/FLINK-29360 Project: Flink Issue Type: Sub-task Components: Connectors / Pulsar Affects Versions: 1.17.0 Reporter: Yufei Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29356) Pulsar Table Source code :implementation
Yufei Zhang created FLINK-29356: --- Summary: Pulsar Table Source code :implementation Key: FLINK-29356 URL: https://issues.apache.org/jira/browse/FLINK-29356 Project: Flink Issue Type: Sub-task Components: Connectors / Pulsar Affects Versions: 1.17.0 Reporter: Yufei Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29357) Pulsar Table Sink code: implementation
Yufei Zhang created FLINK-29357: --- Summary: Pulsar Table Sink code: implementation Key: FLINK-29357 URL: https://issues.apache.org/jira/browse/FLINK-29357 Project: Flink Issue Type: Sub-task Components: Connectors / Pulsar Affects Versions: 1.17.0 Reporter: Yufei Zhang -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Externalized connector release details
> After 1.16, only patches are accepted for 1.2.0-1.15. I feel like this is a misunderstanding that both you and Danny ran into. What I meant in the original proposal is that the last 2 _major_ /connector /versions are supported, with the latest receiving additional features. (Provided that the previous major version still works against a currently supported Flink version!) There will never be patch releases for a minor version if a newer minor version exists. IOW, the minor/patch releases within a major version do not form a tree (like in Flink), but a line. 1.0.0 -> 1.0.1 -> 1.1.0 -> 1.2.0 -> ... NOT 1.0.0 -> 1.0.1 -> 1.0.2 |-> 1.1.0 -> 1.1.1 If we actually follow semantic versioning then it's just not necessary to publish a patch for a previous version. So if 2.x exists, then (the latest) 2.x gets features and patches, and the latest 1.x gets patches. I hope that clears things up. On 20/09/2022 14:00, Jing Ge wrote: Hi, Thanks for starting this discussion. It is an interesting one and yeah, it is a tough topic. It seems like a centralized release version schema control for decentralized connector development ;-) In general, I like this idea, not because it is a good one but because there might be no better one(That's life!). The solution gives users an easy life with the price of extra effort on the developer's part. But it is a chicken and egg situation, i.e. developer friendly vs. user friendly. If it is hard for developers to move forward, it will also be difficult for users to get a new release, even if the version schema is user friendly. I'd like to raise some questions/concerns to make sure we are on the same page. @Chesnay c1) Imagine we have 2.0.0 for 1.15: - 2.0.0-1.14 (patches) - 2.0.0-1.15 (feature and patches) ===> new major release targeting 1.16 and we need to change code for new API - 2.0.0-1.14 (no support) - 2.0.0-1.15 (patches) - 2.0.1-1.15 (new patches) - 2.1.0-1.16 (feature and patches) There is no more 2.1.0-1.15 because only the latest version is receiving new features. b1) Even if in some special cases that we need to break the rule, we should avoid confusing users: ===> new major release targeting 1.16 and we need to change code for new API - 2.0.0-1.14 (no support) - 2.0.0-1.15 (patches) - 2.1.0-1.16 (feature and patches) ===> now we want to break the rule to add features to the penultimate version - 2.0.0-1.14 (no support) - 2.0.0-1.15 (patches) - 2.2.0-1.15 (patches, new features) // 2.1.0-1.15 vs. 2.2.0-1.15, have to choose 2.2.0-1.15 to avoid conflict - 2.1.0-1.16 (feature and patches) we have two options: 2.1.0-1.15 vs. 2.2.0-1.15, both will confuse users: - Using 2.1.0-1.15 will conflict with the existing 2.1.0-1.16. The connector version of "2.1.0-1.16" is actually 2.1.0 which means it has the same code as 2.1.0-1.15 but in this case, it contains upgraded code. - Using 2.2.0-1.15 will skip 2.1.0-1.15. Actually, it needs to skip all occupied minor-1.16 versions, heads-up release manager! c2) Allow me using your example: ===> new major release targeting 1.16 - 1.2.0-1.14 (no support; unsupported Flink version) - 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever happens first) - 2.0.0-1.15 (feature and patches) - 2.0.0-1.16 (feature and patches) I didn't understand the part of "2.0.0-1.15 (features and patches)". After 1.16, only patches are accepted for 1.2.0-1.15. It should be clearly defined how to bump up the connector's version number for the new Flink version. If the connector major number would always bump up, it would make less sense to use the Flink version as postfix. With the same example, it should be: ===> new major release targeting 1.16 - 1.2.0-1.14 (no support; unsupported Flink version) - 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever happens first) - 1.2.1-1.15 (new patches) - 1.3.0-1.16 (feature and patches) - 1.4.0-1.16 (feature and patches, new features) - 2.0.0-1.16 (feature and patches, major upgrade of connector itself) or - 1.2.0-1.14 (patches) - 1.2.0-1.15 (feature and patches) - 2.0.0 -1.15 (feature and patches, major upgrade of connector itself) ===> new major release targeting 1.16 - 1.2.0-1.14 (no support; unsupported Flink version) - 2.0.0-1.15 (patches) - 2.0.1-1.15 (new patches) - 2.1.0-1.16 (feature and patches) - 2.2.0-1.16 (feature and patches, new features) i.e. commonly, there should be no connector major version change when using the Flink version postfix as the version schema. Special cases(rarely happens) are obviously allowed. Best regards, Jing On Tue, Sep 20, 2022 at 10:57 AM Martijn Visser wrote: Hi all, This is a tough topic, I also had to write things down a couple of times. To summarize and add my thoughts: a) I think everyone is agreeing that "Only the last 2 versions of a connector are supported per supported Flink version, with only the latest version receiving new features". In the current
[jira] [Created] (FLINK-29355) Sql parse failed because of Desc catalog.database.table is incorrectly parsed to desc catalog
Yunhong Zheng created FLINK-29355: - Summary: Sql parse failed because of Desc catalog.database.table is incorrectly parsed to desc catalog Key: FLINK-29355 URL: https://issues.apache.org/jira/browse/FLINK-29355 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.17.0 Reporter: Yunhong Zheng Fix For: 1.17.0 If user names the CATALOG he uses as ‘catalog’, and he tries to desc table using syntax 'describe catalog.testDatabase.testTable'. This statement will be incorrectly parsed to 'DESC CATALOG' instead of 'DESC TABLE' . !image-2022-09-20-20-00-19-478.png|width=592,height=187! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Externalized connector release details
Hi, Thanks for starting this discussion. It is an interesting one and yeah, it is a tough topic. It seems like a centralized release version schema control for decentralized connector development ;-) In general, I like this idea, not because it is a good one but because there might be no better one(That's life!). The solution gives users an easy life with the price of extra effort on the developer's part. But it is a chicken and egg situation, i.e. developer friendly vs. user friendly. If it is hard for developers to move forward, it will also be difficult for users to get a new release, even if the version schema is user friendly. I'd like to raise some questions/concerns to make sure we are on the same page. @Chesnay c1) Imagine we have 2.0.0 for 1.15: - 2.0.0-1.14 (patches) - 2.0.0-1.15 (feature and patches) ===> new major release targeting 1.16 and we need to change code for new API - 2.0.0-1.14 (no support) - 2.0.0-1.15 (patches) - 2.0.1-1.15 (new patches) - 2.1.0-1.16 (feature and patches) There is no more 2.1.0-1.15 because only the latest version is receiving new features. b1) Even if in some special cases that we need to break the rule, we should avoid confusing users: ===> new major release targeting 1.16 and we need to change code for new API - 2.0.0-1.14 (no support) - 2.0.0-1.15 (patches) - 2.1.0-1.16 (feature and patches) ===> now we want to break the rule to add features to the penultimate version - 2.0.0-1.14 (no support) - 2.0.0-1.15 (patches) - 2.2.0-1.15 (patches, new features) // 2.1.0-1.15 vs. 2.2.0-1.15, have to choose 2.2.0-1.15 to avoid conflict - 2.1.0-1.16 (feature and patches) we have two options: 2.1.0-1.15 vs. 2.2.0-1.15, both will confuse users: - Using 2.1.0-1.15 will conflict with the existing 2.1.0-1.16. The connector version of "2.1.0-1.16" is actually 2.1.0 which means it has the same code as 2.1.0-1.15 but in this case, it contains upgraded code. - Using 2.2.0-1.15 will skip 2.1.0-1.15. Actually, it needs to skip all occupied minor-1.16 versions, heads-up release manager! c2) Allow me using your example: ===> new major release targeting 1.16 - 1.2.0-1.14 (no support; unsupported Flink version) - 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever happens first) - 2.0.0-1.15 (feature and patches) - 2.0.0-1.16 (feature and patches) I didn't understand the part of "2.0.0-1.15 (features and patches)". After 1.16, only patches are accepted for 1.2.0-1.15. It should be clearly defined how to bump up the connector's version number for the new Flink version. If the connector major number would always bump up, it would make less sense to use the Flink version as postfix. With the same example, it should be: ===> new major release targeting 1.16 - 1.2.0-1.14 (no support; unsupported Flink version) - 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever happens first) - 1.2.1-1.15 (new patches) - 1.3.0-1.16 (feature and patches) - 1.4.0-1.16 (feature and patches, new features) - 2.0.0-1.16 (feature and patches, major upgrade of connector itself) or - 1.2.0-1.14 (patches) - 1.2.0-1.15 (feature and patches) - 2.0.0 -1.15 (feature and patches, major upgrade of connector itself) ===> new major release targeting 1.16 - 1.2.0-1.14 (no support; unsupported Flink version) - 2.0.0-1.15 (patches) - 2.0.1-1.15 (new patches) - 2.1.0-1.16 (feature and patches) - 2.2.0-1.16 (feature and patches, new features) i.e. commonly, there should be no connector major version change when using the Flink version postfix as the version schema. Special cases(rarely happens) are obviously allowed. Best regards, Jing On Tue, Sep 20, 2022 at 10:57 AM Martijn Visser wrote: > Hi all, > > This is a tough topic, I also had to write things down a couple of times. > To summarize and add my thoughts: > > a) I think everyone is agreeing that "Only the last 2 versions of a > connector are supported per supported Flink version, with only the latest > version receiving new features". In the current situation, that means that > Flink 1.14 and Flink 1.15 would be supported for connectors. This results > in a maximum of 4 supported connector versions. > > b1) In an ideal world, I would have liked Flink's APIs that are used by > connectors to be versioned (that's why there's now a Sink V1 and a Sink > V2). However, we're not there yet. > > b2) With regards to the remark of using @Interal APIs, one thing that we > agreed to in previous discussions is that connectors shouldn't need to rely > on @Interal APIs so that the connector surface also stabilizes. > > b3) In the end, I think what matters the most is the user's perception on > versioning. So the first thing to establish would be the versioning for > connectors itself. So you would indeed have a scheme. > Next is the compatibility of that scheme with a version of Flink. I do like > Chesnay's approach for using the Scala suffixes idea. So you would have > _. In the currently > externalized
[jira] [Created] (FLINK-29354) Support TO_DATE and TO_TIMESTAMP built-in function in the Table API
Luning Wang created FLINK-29354: --- Summary: Support TO_DATE and TO_TIMESTAMP built-in function in the Table API Key: FLINK-29354 URL: https://issues.apache.org/jira/browse/FLINK-29354 Project: Flink Issue Type: Improvement Affects Versions: 1.16.0 Reporter: Luning Wang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29353) Support UNIX_TIMESTAMP built-in function in Table API
Luning Wang created FLINK-29353: --- Summary: Support UNIX_TIMESTAMP built-in function in Table API Key: FLINK-29353 URL: https://issues.apache.org/jira/browse/FLINK-29353 Project: Flink Issue Type: Improvement Components: API / Python, Table SQL / API Affects Versions: 1.16.0 Reporter: Luning Wang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29352) Support CONVERT_TZ built-in function in Table API
Luning Wang created FLINK-29352: --- Summary: Support CONVERT_TZ built-in function in Table API Key: FLINK-29352 URL: https://issues.apache.org/jira/browse/FLINK-29352 Project: Flink Issue Type: Improvement Components: API / Python, Table SQL / API Affects Versions: 1.16.0 Reporter: Luning Wang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29351) Enable input buffer floating for blocking shuffle
Yingjie Cao created FLINK-29351: --- Summary: Enable input buffer floating for blocking shuffle Key: FLINK-29351 URL: https://issues.apache.org/jira/browse/FLINK-29351 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.17.0 At input gate, Flink needs exclusive buffers for each input channel. For large parallelism jobs, it is easy to cause "Insufficient number of network buffers" error. This ticket aims to make all input network buffers floating for blocking shuffle to reduce the possibility of "Insufficient number of network buffers" error. This change can also improve the default blocking shuffle performance because buffer floating can increase the buffer utilization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction
Fine by me. Thanks for driving this Lincoln :) Best, Piotrek wt., 20 wrz 2022 o 09:06 Lincoln Lee napisał(a): > Hi all, >I'll start a vote if there are no more objections till this > thursday(9.22). Looking forward to your feedback! > > [1] Flip-260: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction > [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc > > Best, > Lincoln Lee > > > Lincoln Lee 于2022年9月19日周一 17:38写道: > > > Hi Jingsong, > >Thank you for participating this discussion! For the method name, I > > think we should follow the new finish() method in `StreamOperator`, the > > BoundedOneInput might be removed in the future as discussed [1] before > > > > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb > > > > Best, > > Lincoln Lee > > > > > > Jingsong Li 于2022年9月19日周一 10:13写道: > > > >> +1 to add `finish()` method to `TableFunction` only. > >> > >> Can we use `endInput` just like `BoundedOneInput`? > >> > >> Best, > >> Jingsong > >> > >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee > >> wrote: > >> > > >> > Hi Dawid, Piotr, > >> >Agree with you that add finish() method to `TableFunction` only. > >> Other > >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`, > >> > `AggregateTableFunction`) are not necessarily to have the finish > >> > method(they can not emit records in legacy close() method). > >> > > >> > A `TableFunction` is used to correlate with the left table/stream, the > >> > following example shows a case that user only select columns from the > >> > correlated 'FeatureTF' (no left table column was selected): > >> > ``` > >> > SELECT feature1, feature2, feature3 > >> > FROM MyTable t1 > >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2, > >> > feature3) ON TRUE > >> > ``` > >> > the 'FeatureTF' can do some flushing work in legacy close() method and > >> this > >> > doesn't break any sql semantics, so I don't see any reason that we can > >> > enforce users not do flushing work in new finish() method. I've > updated > >> the > >> > flip doc to limit the change only for `TableFunction`[1]. > >> > > >> > For the more powerful `ProcessFunction`, I'd like to share some > >> thoughts: > >> > There indeed exists requirements for advanced usage in Table/SQL, > even a > >> > further UD-Operator, e.g., UD-Join for user controlled join logic > which > >> can > >> > not simply expressed by SQL. This is an interesting topic, expect more > >> > discussions on this. > >> > > >> > > >> > [1] > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction > >> > > >> > Best, > >> > Lincoln Lee > >> > > >> > > >> > Piotr Nowojski 于2022年9月15日周四 22:39写道: > >> > > >> > > Hi Dawid, Lincoln, > >> > > > >> > > I would tend to agree with Dawid. It seems to me like > `TableFunction` > >> is > >> > > the one that needs to be taken care of. Other types of > >> > > `UserDefinedFunction` wouldn't be able to emit anything from the > >> `finish()` > >> > > even if we added it. And if we added `finish(Collector out)` to > >> them, it > >> > > would create the same problems (how to pass the output type) that > >> prevented > >> > > us from adding `finish()` to all functions in the DataStream API. > >> > > > >> > > However I'm not sure what should be the long term solution for the > >> Table > >> > > API. For the DataStream API we wanted to provide a new, better and > >> more > >> > > powerful `ProcessFunction` for all of the unusual use cases, that > >> currently > >> > > require the use of `StreamOperator` API instead of `DataStream` > >> functions. > >> > > I don't know what would be an alternative in the Table API. > >> > > > >> > > Dawid, who do you think we should ping from the Table API/SQL teams > >> to chip > >> > > in? > >> > > > >> > > Best, > >> > > Piotrek > >> > > > >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz > >> > > napisał(a): > >> > > > >> > > > Hey Lincoln, > >> > > > > >> > > > Thanks for opening the discussion. > >> > > > > >> > > > To be honest I am not convinced if emitting from close there is a > >> > > > contract that was envisioned and thus should be maintained. As far > >> as I > >> > > > can see it does affect only the TableFunction, because it has the > >> > > > collect method. None of the other UDFs (ScalarFunction, > >> > > > AggregateFunction) have means to emit records from close(). > >> > > > > >> > > > To be honest I am not sure what would be the consequences of > >> interplay > >> > > > with other operators which expect TableFunction to emit only when > >> eval > >> > > > is called. Not sure if there are such. > >> > > > > >> > > > If it is a thing that we are certain we want to support, I'd be > much > >> > > > more comfortable adding finish() to the TableFunction instead. > >> Would be > >> > > > happy to hear opinions from the Table API folks. > >> > > > > >> > > > Best, > >> > > > >
Re: [DISCUSS] Externalized connector release details
Hi all, This is a tough topic, I also had to write things down a couple of times. To summarize and add my thoughts: a) I think everyone is agreeing that "Only the last 2 versions of a connector are supported per supported Flink version, with only the latest version receiving new features". In the current situation, that means that Flink 1.14 and Flink 1.15 would be supported for connectors. This results in a maximum of 4 supported connector versions. b1) In an ideal world, I would have liked Flink's APIs that are used by connectors to be versioned (that's why there's now a Sink V1 and a Sink V2). However, we're not there yet. b2) With regards to the remark of using @Interal APIs, one thing that we agreed to in previous discussions is that connectors shouldn't need to rely on @Interal APIs so that the connector surface also stabilizes. b3) In the end, I think what matters the most is the user's perception on versioning. So the first thing to establish would be the versioning for connectors itself. So you would indeed have a scheme. Next is the compatibility of that scheme with a version of Flink. I do like Chesnay's approach for using the Scala suffixes idea. So you would have _. In the currently externalized Elasticsearch connector, we would end up with 3.0.0_1.14 and 3.0.0_1.15 as first released versions. If a new Flink version would be released that doesn't require code changes to the connector, the released version would be 3.0.0_1.16. That means that there have been no connector code changes (no patches, no new features) when comparing this across different Flink versions. b4) Now using the example that Chesnay provided (yet slightly modified to match it with the Elasticsearch example I've used above), there exists an Elasticsearch connector 3.0.0_1.15. Now in Flink 1.16, there's a new API that we want to use, which is a test util. It would result in version 3.1.0_1.16 for the new Flink version. Like Chesnay said, for the sake of argument, at the same time we also had some pending changes for the 1.15 connector (let's say exclusive to 1.15; some workaround for a bug or smth), so we would also end up with 3.1.0-1.15. I agree with Danny that we should avoid this situation: the perception of the user would be that there's no divergence between the 3.1.0 version, except the compatible Flink version. I really am wondering how often we will run in that situation. From what I've seen so far with connectors is that bug fixes always end up in both the release branch and the master branch. The only exceptions are test stabilities or documentation fixes, but if we only resolve these, they wouldn't need to be released. If such a special occasion would occur, I would be inclined to go for a hotfix approach, where you would end up with 3.0.0.1_1.15. c) Branch wise, I think we should end up with _. So again the Elasticsearch example, at this moment there would be 3.0.0_1.14 and 3.0.0_1.15 branches. Best regards, Martijn
Re: [DISCUSS] Externalized connector release details
c) @Ryan: I'm generally fine with leaving it up to the connector on how to implement Flink version-specific behavior, so long as the branching models stays consistent (primarily so that the release process is identical and we can share infrastructure). @Danny: In a single branch per version model we should update the version when making a change specific to a given Flink version. That a change for the 1.16 code can affect the 1.15 version is a price I'm willing to pay. If you use separate branches this becomes more complicated (surprise!), because then you likely end up with converging code-bases using the same version. Because either a) you have 1.0.0 for both with diverging Flink version-specific code, b) create 1.1.0-1.16, but then what of 1.1.0-1.15? Do you skip 1.1.0 for 1.15? That would just be strange. c1-3) I don't think we need to limit ourselves to only adding patches for Flink n-1. In my mind it would progress like this: - 1.0.0-1.14 (features & patches) ===> 1.15 is released, let's support it; adding some version-specific code - 1.1.0-1.14 (feature and patches) - 1.1.0-1.15 (feature and patches) ===> general improvements to the v1 connector - 1.2.0-1.14 (feature and patches) - 1.2.0-1.15 (feature and patches) ===> new major release targeting 1.16 - 1.2.0-1.14 (no support; unsupported Flink version) - 1.2.0-1.15 (patches; supported until either 3.0 of 1.17, whichever happens first) - 2.0.0-1.15 (feature and patches) - 2.0.0-1.16 (feature and patches) d) Does the social aspect actually require a "main" branch, or just a _default_ branch on which development actually takes place? Unless we develop a habit of creating a significant number of major releases per year, we could use the branch of the latest connector version as the default. On 16/09/2022 20:03, Danny Cranmer wrote: c) I am torn here. I do not like the idea that the connector code could diverge for the same connector version, ie 2.1.0-1.15 and 2.1.0-1.16. If the Flink version change requires a change to the connector code, then this should result in a new connector version in my opinion. Going back to your example of API changes, this means we could end up supporting new features for multiple Flink versions with potentially different @Internal implementations. Where do we draw the line here? Hypothetically if the @Internal Flink apis change substantially, but do not impact the public connector interface, this could still end up with the same connector version. Example: - 1.0.0-1.14 (uses @Internal connector API v1) - 1.0.0-1.15 (uses @Internal connector API v2) But on the flip side, this example is very good "let's say exclusive to 1.15; some workaround for a bug or smth", and I am not sure how the single branch per version approach would solve it. c1) We would likely need to do something like this: - 1.0.0-1.14 (patches) - 1.1.0-1.15 (contains some 1.15 bugfix) (feature and patches) c2) When 1.16 is released, we would need to remove the 1.15 bugfix, and therefore change the connector code: - 1.0.0-1.14 - 1.1.0-1.15 (contains some 1.15 bugfix) (patches) - 1.2.0-1.16 (features and patches) c3) But following this approach I can see a case where we end up with a version gap to support Flink (n-1) patches: - 1.0.0-1.14 - 1.1.0-1.15 (contains some 1.15 bugfix) (patches) - 1.2.0-1.16 - 1.3.0-1.16 - 1.4.0-1.16 (patches) - 1.5.0-1.16 (features and patches) I think we need some animations to help with these! d) Agree with the "Socially" point generally. Thanks, On Fri, Sep 16, 2022 at 1:30 PM Ryan Skraba wrote: I had to write down a diagram to fully understand the discussion :D If I recall correctly, during the externalization discussion, the "price to pay" for the (many) advantages of taking connectors out of the main repo was to maintain and manually consult a compatibility matrix per connector. I'm not a fan of that approach, and your example of diverging code between 2.1.0-1.15 and 2.1.0-1.16 is a good reason why. b) I think your proposal here is a viable alternative. c) In my experience, the extra effort of correctly cherry-picking to the "right" branches adds a small burden to each commit and release event. The biggest challenge will be for committers for each connector to be mindful of which versions are "still in the game" (but this is also true for the compatibility matrix approach). Two major versions of connectors multiplied by two versions of Flink is up to three cherry-picks per commit -- plus one if the connector is currently being migrated and exists simultaneously inside and outside the main repo, plus another for the previous still-supported version of flink. It's going to take some education effort! Weighing in on the shim approach: this might be something to leave up to each connector -- I can see it being easier or more relevant for some connectors than others to use dedicated branches versus dedicated modules per flink version, and this might evolve with the
[jira] [Created] (FLINK-29350) Add a note for swapping planner jar in Hive dependencies page
luoyuxia created FLINK-29350: Summary: Add a note for swapping planner jar in Hive dependencies page Key: FLINK-29350 URL: https://issues.apache.org/jira/browse/FLINK-29350 Project: Flink Issue Type: Improvement Components: Connectors / Hive, Documentation Affects Versions: 1.16.0 Reporter: luoyuxia Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29349) Use state ttl instead of timer to clean up state in proctime over aggregate
lincoln lee created FLINK-29349: --- Summary: Use state ttl instead of timer to clean up state in proctime over aggregate Key: FLINK-29349 URL: https://issues.apache.org/jira/browse/FLINK-29349 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.15.2, 1.16.0 Reporter: lincoln lee Fix For: 1.17.0 Currently we rely on the timer based state cleaning in proctime over aggregate, this can be optimized to use state ttl for a more efficienct way -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29348) The DPP(dynamic partition pruning) can not work with adaptive batch scheduler
Lijie Wang created FLINK-29348: -- Summary: The DPP(dynamic partition pruning) can not work with adaptive batch scheduler Key: FLINK-29348 URL: https://issues.apache.org/jira/browse/FLINK-29348 Project: Flink Issue Type: Bug Reporter: Lijie Wang When running tpcds with both DPP(dynamic partition pruning) and adaptive batch scheduler enabled, q14a.sql fails due to the following exception: {code:java} 2022-09-20 10:34:18,244 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job q14a.sql (6d4355bdde514be083b9762e286626d2) switched from state FAILING to FAILED. org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:125) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1031) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:588) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at sun.reflect.GeneratedMethodAccessor69.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_332] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_332] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT] at
Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction
Hi all, I'll start a vote if there are no more objections till this thursday(9.22). Looking forward to your feedback! [1] Flip-260: https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc Best, Lincoln Lee Lincoln Lee 于2022年9月19日周一 17:38写道: > Hi Jingsong, >Thank you for participating this discussion! For the method name, I > think we should follow the new finish() method in `StreamOperator`, the > BoundedOneInput might be removed in the future as discussed [1] before > > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb > > Best, > Lincoln Lee > > > Jingsong Li 于2022年9月19日周一 10:13写道: > >> +1 to add `finish()` method to `TableFunction` only. >> >> Can we use `endInput` just like `BoundedOneInput`? >> >> Best, >> Jingsong >> >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee >> wrote: >> > >> > Hi Dawid, Piotr, >> >Agree with you that add finish() method to `TableFunction` only. >> Other >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`, >> > `AggregateTableFunction`) are not necessarily to have the finish >> > method(they can not emit records in legacy close() method). >> > >> > A `TableFunction` is used to correlate with the left table/stream, the >> > following example shows a case that user only select columns from the >> > correlated 'FeatureTF' (no left table column was selected): >> > ``` >> > SELECT feature1, feature2, feature3 >> > FROM MyTable t1 >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2, >> > feature3) ON TRUE >> > ``` >> > the 'FeatureTF' can do some flushing work in legacy close() method and >> this >> > doesn't break any sql semantics, so I don't see any reason that we can >> > enforce users not do flushing work in new finish() method. I've updated >> the >> > flip doc to limit the change only for `TableFunction`[1]. >> > >> > For the more powerful `ProcessFunction`, I'd like to share some >> thoughts: >> > There indeed exists requirements for advanced usage in Table/SQL, even a >> > further UD-Operator, e.g., UD-Join for user controlled join logic which >> can >> > not simply expressed by SQL. This is an interesting topic, expect more >> > discussions on this. >> > >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction >> > >> > Best, >> > Lincoln Lee >> > >> > >> > Piotr Nowojski 于2022年9月15日周四 22:39写道: >> > >> > > Hi Dawid, Lincoln, >> > > >> > > I would tend to agree with Dawid. It seems to me like `TableFunction` >> is >> > > the one that needs to be taken care of. Other types of >> > > `UserDefinedFunction` wouldn't be able to emit anything from the >> `finish()` >> > > even if we added it. And if we added `finish(Collector out)` to >> them, it >> > > would create the same problems (how to pass the output type) that >> prevented >> > > us from adding `finish()` to all functions in the DataStream API. >> > > >> > > However I'm not sure what should be the long term solution for the >> Table >> > > API. For the DataStream API we wanted to provide a new, better and >> more >> > > powerful `ProcessFunction` for all of the unusual use cases, that >> currently >> > > require the use of `StreamOperator` API instead of `DataStream` >> functions. >> > > I don't know what would be an alternative in the Table API. >> > > >> > > Dawid, who do you think we should ping from the Table API/SQL teams >> to chip >> > > in? >> > > >> > > Best, >> > > Piotrek >> > > >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz >> > > napisał(a): >> > > >> > > > Hey Lincoln, >> > > > >> > > > Thanks for opening the discussion. >> > > > >> > > > To be honest I am not convinced if emitting from close there is a >> > > > contract that was envisioned and thus should be maintained. As far >> as I >> > > > can see it does affect only the TableFunction, because it has the >> > > > collect method. None of the other UDFs (ScalarFunction, >> > > > AggregateFunction) have means to emit records from close(). >> > > > >> > > > To be honest I am not sure what would be the consequences of >> interplay >> > > > with other operators which expect TableFunction to emit only when >> eval >> > > > is called. Not sure if there are such. >> > > > >> > > > If it is a thing that we are certain we want to support, I'd be much >> > > > more comfortable adding finish() to the TableFunction instead. >> Would be >> > > > happy to hear opinions from the Table API folks. >> > > > >> > > > Best, >> > > > >> > > > Dawid >> > > > >> > > > On 14/09/2022 15:55, Lincoln Lee wrote: >> > > > > Thanks @Piort for your valuable inputs! >> > > > > >> > > > > I did a quick read of the previous discussion you mentioned, >> seems my >> > > > flip >> > > > > title doesn't give a clear scope here and make some confusions, >> if my >> > > > > understanding is correct, the UDFs in your context is the user >> > >
[jira] [Created] (FLINK-29347) Failed to restore from list state with empty protobuf object
shen created FLINK-29347: Summary: Failed to restore from list state with empty protobuf object Key: FLINK-29347 URL: https://issues.apache.org/jira/browse/FLINK-29347 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.15.0, 1.14.2 Reporter: shen I use protobuf generated class in an union list state. When my flink job restores from checkpoint, I get exception: {code:java} Caused by: java.lang.RuntimeException: Could not create class com.MY_PROTOBUF_GENERATED_CLASS at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) ~[my-lib-0.1.1-SNAPSHOT.jar:?] at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) ~[my-lib-0.1.1-SNAPSHOT.jar:?] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at