[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779364#comment-17779364 ] Yun Tang commented on FLINK-33355: -- I think this is because you forgot to set the uid for each operator. Since `windowAll` operator could only have parallelism 1, all operators would chain together once you change the parallelism to 1. Please assign the operator id as doc https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/savepoints/#assigning-operator-ids said. > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779358#comment-17779358 ] Yun Tang commented on FLINK-33355: -- [~edmond_j] How did you assign the parallelism, by setting the configuration of `parallelism.default`? > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779359#comment-17779359 ] zhang commented on FLINK-33355: --- [~yunta] yes > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32107] [Tests] Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]
victor9309 commented on PR #23528: URL: https://github.com/apache/flink/pull/23528#issuecomment-1778610263 Thanks @XComp for the review. I test to execute the next attempt when it fails > can you double-check that wget works properly (i.e. returns an non-zero exit code) if accessing the website fails? ...to make the retry logic work. ![image](https://github.com/apache/flink/assets/18453843/f5b19771-5ef0-4e78-bdd7-fee151e7ccb6) The only parameter to modify is to execute the downloaded command, which makes it more intuitive. > shouldn't we be able to generalize it even more? We could move the actual download logic into this function as well. The only parameter that should be passed would be the URL of the artifact. WDYT? ![image](https://github.com/apache/flink/assets/18453843/73b328ce-9eda-453b-bb68-d100fa490178) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779352#comment-17779352 ] zhang commented on FLINK-33355: --- [~yunta] {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SqlServerSourceBuilder.SqlServerIncrementalSource cdcSource = ...; DataStreamSource cdcStream = env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "cdc"); cdcStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .trigger(new CustomTrigger<>()) .apply(new CustomFunction()).print(); env.execute(); {code} For the task mentioned above, if I reduce the parallelism from n to 1, will encounter the previously mentioned error. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SqlServerSourceBuilder.SqlServerIncrementalSource cdcSource = ...; DataStreamSource cdcStream = env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "cdc"); cdcStream.print(); env.execute(); {code} But doing this doesn't result in an error, so I suspect it might be caused by window operators. > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Update iteration.md | Spelling [flink-ml]
as1605 opened a new pull request, #257: URL: https://github.com/apache/flink-ml/pull/257 Minor spelling and grammar change ## What is the purpose of the change Fixes spelling errors in the documentation ## Brief change log - `tarnsmitted` to `transmitted` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add announcement blog post for Flink 1.18 [flink-web]
luoyuxia commented on code in PR #680: URL: https://github.com/apache/flink-web/pull/680#discussion_r1371212872 ## docs/content/posts/2023-10-10-release-1.18.0.md: ## @@ -0,0 +1,572 @@ +--- +authors: +- JingGe: + name: "Jing Ge" + twitter: jingengineer +- KonstantinKnauf: + name: "Konstantin Knauf" + twitter: snntrable +- SergeyNuyanzin: + name: "Sergey Nuyanzin" + twitter: uckamello +- QingshengRen: + name: "Qingsheng Ren" + twitter: renqstuite +date: "2023-10-10T08:00:00Z" +subtitle: "" +title: Announcing the Release of Apache Flink 1.18 +aliases: +- /news/2023/10/10/release-1.18.0.html +--- + +The Apache Flink PMC is pleased to announce the release of Apache Flink 1.18.0. As usual, we are looking at a packed +release with a wide variety of improvements and new features. Overall, 174 people contributed to this release completing +18 FLIPS and 700+ issues. Thank you! + +Let's dive into the highlights. + +# Towards a Streaming Lakehouse + +## Flink SQL Improvements + +### Introduce Flink JDBC Driver For SQL Gateway + +Flink 1.18 comes with a JDBC Driver for the Flink SQL Gateway. So, you can now use any SQL Client that supports JDBC to +interact with your tables via Flink SQL. Here is an example using [SQLLine](https://julianhyde.github.io/sqlline/manual.html). + +```shell +sqlline> !connect jdbc:flink://localhost:8083 +``` + +```shell +sqlline version 1.12.0 +sqlline> !connect jdbc:flink://localhost:8083 +Enter username for jdbc:flink://localhost:8083: +Enter password for jdbc:flink://localhost:8083: +0: jdbc:flink://localhost:8083> CREATE TABLE T( +. . . . . . . . . . . . . . .)> a INT, +. . . . . . . . . . . . . . .)> b VARCHAR(10) +. . . . . . . . . . . . . . .)> ) WITH ( +. . . . . . . . . . . . . . .)> 'connector' = 'filesystem', +. . . . . . . . . . . . . . .)> 'path' = 'file:///tmp/T.csv', +. . . . . . . . . . . . . . .)> 'format' = 'csv' +. . . . . . . . . . . . . . .)> ); +No rows affected (0.122 seconds) +0: jdbc:flink://localhost:8083> INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello'); ++--+ +| job id | ++--+ +| fbade1ab4450fc57ebd5269fdf60dcfd | ++--+ +1 row selected (1.282 seconds) +0: jdbc:flink://localhost:8083> SELECT * FROM T; ++---+---+ +| a | b | ++---+---+ +| 1 | Hi| +| 2 | Hello | ++---+---+ +2 rows selected (1.955 seconds) +0: jdbc:flink://localhost:8083> +``` + +**More Information** +* [Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/jdbcdriver/) +* [FLIP-293: Introduce Flink Jdbc Driver For SQL Gateway](https://cwiki.apache.org/confluence/display/FLINK/FLIP-293%3A+Introduce+Flink+Jdbc+Driver+For+Sql+Gateway) + + +### Stored Procedure Support for Flink Connectors + +Stored procedures have been an indispensable tool in traditional databases, +offering a convenient way to encapsulate complex logic for data manipulation +and administrative tasks. They also offer the potential for enhanced +performance, since they can trigger the handling of data operations directly +within an external database. Other popular data systems like Trino and Iceberg +automate and simplify common maintenance tasks into small sets of procedures, +which greatly reduces users' administrative burden. + +This new update primarily targets developers of Flink connectors, who can now +predefine custom stored procedures into connectors via the Catalog interface. +The primary benefit to users is that connector-specific tasks that previously +may have required writing custom Flink code can now be replaced with simple +calls that encapsulate, standardize, and potentially optimize the underlying +operations. Users can execute procedures using the familiar `CALL` syntax, and +discover a connector's available procedures with `SHOW PROCEDURES`. Stored +procedures within connectors improves the extensibility of Flink's SQL and +Table APIs, and should unlock smoother data access and management for users. + +**More Information** +* [Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/procedures/) +* [FLIP-311: Support Call Stored Procedure](https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure) + +### Extended DDL Support + +From this release onwards, Flink supports + +- `REPLACE TABLE AS SELECT` +- `CREATE OR REPLACE TABLE AS SELECT` + +and both these commands and previously supported `CREATE TABLE AS` can now support atomicity provided the underlying +connector also supports this. + +Moreover, Apache Flink now supports TRUNCATE TABLE in batch execution mode. Same as before, the underlying connector needs +to implement and provide this capability + +And, finally, we have also implemented support for adding, dropping and listing partitions via + +- `ALTER TABLE ADD PARTITION` +- `ALTER TABLE DROP PARTITION` +- `SHOW P
Re: [PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]
caicancai commented on PR #688: URL: https://github.com/apache/flink-kubernetes-operator/pull/688#issuecomment-177853 https://github.com/apache/flink-kubernetes-operator/assets/77189278/817ec9f0-eb8a-4ea3-bb56-99a4f3131b43";> Perform mvn clean install -DskipTests -Pgenerate-docs There are also some changes to the file, which I don't know if I need to upload -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]
tisonkun commented on PR #688: URL: https://github.com/apache/flink-kubernetes-operator/pull/688#issuecomment-1778582314 ``` [INFO] [INFO] Reactor Summary for Flink Kubernetes: 1.7-SNAPSHOT: [INFO] [INFO] Flink Kubernetes: .. SUCCESS [ 5.258 s] [INFO] Flink Kubernetes Standalone SUCCESS [ 23.994 s] [INFO] Flink Kubernetes Operator Api .. SUCCESS [ 35.975 s] [INFO] Flink Autoscaler ... SUCCESS [ 19.100 s] [INFO] Flink Kubernetes Operator .. SUCCESS [06:56 min] [INFO] Flink Kubernetes Webhook ... SUCCESS [ 16.950 s] [INFO] Flink Kubernetes Docs .. SUCCESS [ 14.774 s] [INFO] Flink SQL Runner Example ... SUCCESS [ 7.986 s] [INFO] Flink Beam Example . SUCCESS [ 43.310 s] [INFO] Flink Kubernetes Client Code Example ... SUCCESS [ 10.857 s] [INFO] Flink Autoscaler Test Job .. SUCCESS [ 8.115 s] [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 10:03 min [INFO] Finished at: 2023-10-25T06:11:12Z [INFO] Please generate the java doc via 'mvn clean install -DskipTests -Pgenerate-docs' again Error: Process completed with exit code 1. ``` succeed but fail - why? cc @gyfora -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]
caicancai commented on PR #688: URL: https://github.com/apache/flink-kubernetes-operator/pull/688#issuecomment-1778580734 I am working on this cli issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33356) The navigation bar on Flink’s official website is messed up.
[ https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779345#comment-17779345 ] Lijie Wang commented on FLINK-33356: [~Wencong Liu] Assigned to you. > The navigation bar on Flink’s official website is messed up. > > > Key: FLINK-33356 > URL: https://issues.apache.org/jira/browse/FLINK-33356 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Junrui Li >Assignee: Wencong Liu >Priority: Major > Attachments: image-2023-10-25-11-55-52-653.png, > image-2023-10-25-12-34-22-790.png > > > The side navigation bar on the Flink official website at the following link: > [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed > up, as shown in the attached screenshot. > !image-2023-10-25-11-55-52-653.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33356) The navigation bar on Flink’s official website is messed up.
[ https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lijie Wang reassigned FLINK-33356: -- Assignee: Wencong Liu > The navigation bar on Flink’s official website is messed up. > > > Key: FLINK-33356 > URL: https://issues.apache.org/jira/browse/FLINK-33356 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Junrui Li >Assignee: Wencong Liu >Priority: Major > Attachments: image-2023-10-25-11-55-52-653.png, > image-2023-10-25-12-34-22-790.png > > > The side navigation bar on the Flink official website at the following link: > [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed > up, as shown in the attached screenshot. > !image-2023-10-25-11-55-52-653.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33356) The navigation bar on Flink’s official website is messed up.
[ https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779341#comment-17779341 ] Junrui Li commented on FLINK-33356: --- [~Wencong Liu] Thank you for your volunteering, [~wanglijie] could you help to assign this ticket to Wencong? > The navigation bar on Flink’s official website is messed up. > > > Key: FLINK-33356 > URL: https://issues.apache.org/jira/browse/FLINK-33356 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Junrui Li >Priority: Major > Attachments: image-2023-10-25-11-55-52-653.png, > image-2023-10-25-12-34-22-790.png > > > The side navigation bar on the Flink official website at the following link: > [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed > up, as shown in the attached screenshot. > !image-2023-10-25-11-55-52-653.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779340#comment-17779340 ] Yun Tang commented on FLINK-33355: -- [~edmond_j] could you please share the code to reproduce this problem? > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33356) The navigation bar on Flink’s official website is messed up.
[ https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779330#comment-17779330 ] Wencong Liu edited comment on FLINK-33356 at 10/25/23 5:58 AM: --- Hello [~JunRuiLi] , I found this case is due to the commit "30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5", which modified the subproject commit. I think we should revert this change. Could you assign to me? !image-2023-10-25-12-34-22-790.png! was (Author: JIRAUSER281639): Hello [~JunRuiLi] , I found this case is due to the commit "30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5", which modified the subproject commit. I think we should revert this change !image-2023-10-25-12-34-22-790.png! > The navigation bar on Flink’s official website is messed up. > > > Key: FLINK-33356 > URL: https://issues.apache.org/jira/browse/FLINK-33356 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Junrui Li >Priority: Major > Attachments: image-2023-10-25-11-55-52-653.png, > image-2023-10-25-12-34-22-790.png > > > The side navigation bar on the Flink official website at the following link: > [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed > up, as shown in the attached screenshot. > !image-2023-10-25-11-55-52-653.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779336#comment-17779336 ] zhang commented on FLINK-33355: --- [~yunta] I know that modifying the max parallelism is not possible, but I only changed the parallelism by using the parameter {*}parallelism.default{*}. In my tests, reducing it from n to m (where m < n and m > 1) is feasible. However, reducing it to 1 results in the aforementioned error. Nevertheless, I have a requirement to reduce the parallelism of a Flink process from n to 1. > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30768] [Project Website] flink-web version cleanup [flink-web]
victor9309 commented on PR #683: URL: https://github.com/apache/flink-web/pull/683#issuecomment-1778542189 Thanks @XComp for the review. Thank you very much for your advice. ``` $ cat foo-utils.sh function foo() { echo 'foo...' >&2 exit 1 } $ cat foo-main.sh source ./foo-utils.sh v=$(foo) echo "$v" echo "end-" ``` ![image](https://github.com/apache/flink-web/assets/18453843/a83ad090-1e5b-4862-8ca1-c9328ebe76dc) I found that exit 1 in the function can not exit the script, so I modify the logic, please check it ``` $ cat foo-utils.sh export TOP_PID=$$ trap 'exit 1' TERM function foo() { echo 'foo...' >&2 kill -s TERM $TOP_PID } $ cat foo-main.sh source ./foo-utils.sh v=$(foo) echo "$v" echo "end-" ``` ![image](https://github.com/apache/flink-web/assets/18453843/be4816f5-19fb-40ef-9916-6543cb3ba1df) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-33357) add Apache Software License 2
[ https://issues.apache.org/jira/browse/FLINK-33357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen reassigned FLINK-33357: - Assignee: 蔡灿材 > add Apache Software License 2 > - > > Key: FLINK-33357 > URL: https://issues.apache.org/jira/browse/FLINK-33357 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0 >Reporter: 蔡灿材 >Assignee: 蔡灿材 >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.5.0 > > Attachments: 2023-10-25 12-08-58屏幕截图.png > > > Flinkdeployments.flink.apache.org - v1. Currently yml and > flinksessionjobs.flink.apache.org - v1. Yml don't > add add Apache Software License 2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33358) Flink SQL Client fails to start in Flink on YARN
Prabhu Joseph created FLINK-33358: - Summary: Flink SQL Client fails to start in Flink on YARN Key: FLINK-33358 URL: https://issues.apache.org/jira/browse/FLINK-33358 Project: Flink Issue Type: Bug Components: Deployment / YARN, Table SQL / Client Affects Versions: 1.18.0 Reporter: Prabhu Joseph Flink SQL Client fails to start in Flink on YARN with below error {code:java} flink-yarn-session -tm 2048 -s 2 -d /usr/lib/flink/bin/sql-client.sh Exception in thread "main" org.apache.flink.table.client.SqlClientException: Could not read from command line. at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221) at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:179) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:121) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:114) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.table.client.config.SqlClientOptions at org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59) at org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633) at org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615) at org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554) at org.jline.reader.impl.LineReaderImpl.doCleanup(LineReaderImpl.java:2340) at org.jline.reader.impl.LineReaderImpl.cleanup(LineReaderImpl.java:2332) at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:626) at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:194) ... 7 more {code} The issue is due to the old jline jar from Hadoop classpath (/usr/lib/hadoop-yarn/lib/jline-3.9.0.jar) taking first precedence. Flink-1.18 requires jline-3.21.0.jar. Placing flink-sql-client.jar (bundled with jline-3.21) before the Hadoop classpath fixes the issue. {code:java} diff --git a/flink-table/flink-sql-client/bin/sql-client.sh b/flink-table/flink-sql-client/bin/sql-client.sh index 24746c5dc8..4ab8635de2 100755 --- a/flink-table/flink-sql-client/bin/sql-client.sh +++ b/flink-table/flink-sql-client/bin/sql-client.sh @@ -89,7 +89,7 @@ if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then # start client with jar -exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`" +exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$FLINK_SQL_CLIENT_JAR:$INTERNAL_HADOOP_CLASSPATHS`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`" # write error message to stderr else {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33358) Flink SQL Client fails to start in Flink on YARN
[ https://issues.apache.org/jira/browse/FLINK-33358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prabhu Joseph updated FLINK-33358: -- Description: Flink SQL Client fails to start in Flink on YARN with below error {code:java} flink-yarn-session -tm 2048 -s 2 -d /usr/lib/flink/bin/sql-client.sh Exception in thread "main" org.apache.flink.table.client.SqlClientException: Could not read from command line. at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221) at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:179) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:121) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:114) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.table.client.config.SqlClientOptions at org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59) at org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633) at org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615) at org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554) at org.jline.reader.impl.LineReaderImpl.doCleanup(LineReaderImpl.java:2340) at org.jline.reader.impl.LineReaderImpl.cleanup(LineReaderImpl.java:2332) at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:626) at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:194) ... 7 more {code} The issue is due to the old jline jar from Hadoop (3.3.3) classpath (/usr/lib/hadoop-yarn/lib/jline-3.9.0.jar) taking first precedence. Flink-1.18 requires jline-3.21.0.jar. Placing flink-sql-client.jar (bundled with jline-3.21) before the Hadoop classpath fixes the issue. {code:java} diff --git a/flink-table/flink-sql-client/bin/sql-client.sh b/flink-table/flink-sql-client/bin/sql-client.sh index 24746c5dc8..4ab8635de2 100755 --- a/flink-table/flink-sql-client/bin/sql-client.sh +++ b/flink-table/flink-sql-client/bin/sql-client.sh @@ -89,7 +89,7 @@ if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then # start client with jar -exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`" +exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$FLINK_SQL_CLIENT_JAR:$INTERNAL_HADOOP_CLASSPATHS`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`" # write error message to stderr else {code} was: Flink SQL Client fails to start in Flink on YARN with below error {code:java} flink-yarn-session -tm 2048 -s 2 -d /usr/lib/flink/bin/sql-client.sh Exception in thread "main" org.apache.flink.table.client.SqlClientException: Could not read from command line. at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221) at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:179) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:121) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:114) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179) Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.table.client.config.SqlClientOptions at org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59) at org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633) at org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615) at org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554) at org.jline.reader.impl.LineReaderImpl.doCleanup(LineReaderImpl.java:2340) at org.jline.rea
[jira] [Commented] (FLINK-33356) The navigation bar on Flink’s official website is messed up.
[ https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779330#comment-17779330 ] Wencong Liu commented on FLINK-33356: - Hello [~JunRuiLi] , I found this case is due to the commit "30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5", which modified the subproject commit. I think we should revert this change !image-2023-10-25-12-34-22-790.png! > The navigation bar on Flink’s official website is messed up. > > > Key: FLINK-33356 > URL: https://issues.apache.org/jira/browse/FLINK-33356 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Junrui Li >Priority: Major > Attachments: image-2023-10-25-11-55-52-653.png, > image-2023-10-25-12-34-22-790.png > > > The side navigation bar on the Flink official website at the following link: > [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed > up, as shown in the attached screenshot. > !image-2023-10-25-11-55-52-653.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33356) The navigation bar on Flink’s official website is messed up.
[ https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wencong Liu updated FLINK-33356: Attachment: image-2023-10-25-12-34-22-790.png > The navigation bar on Flink’s official website is messed up. > > > Key: FLINK-33356 > URL: https://issues.apache.org/jira/browse/FLINK-33356 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Junrui Li >Priority: Major > Attachments: image-2023-10-25-11-55-52-653.png, > image-2023-10-25-12-34-22-790.png > > > The side navigation bar on the Flink official website at the following link: > [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed > up, as shown in the attached screenshot. > !image-2023-10-25-11-55-52-653.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]
yigress commented on PR #23425: URL: https://github.com/apache/flink/pull/23425#issuecomment-1778479885 @pnowojski I rebased and it kicked off a rerun successfully. I also run some job for a day without problem. if looks good can you help merge it too? thank you so much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33357) add Apache Software License 2
[ https://issues.apache.org/jira/browse/FLINK-33357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33357: --- Labels: pull-request-available (was: ) > add Apache Software License 2 > - > > Key: FLINK-33357 > URL: https://issues.apache.org/jira/browse/FLINK-33357 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0 >Reporter: 蔡灿材 >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.5.0 > > Attachments: 2023-10-25 12-08-58屏幕截图.png > > > Flinkdeployments.flink.apache.org - v1. Currently yml and > flinksessionjobs.flink.apache.org - v1. Yml don't > add add Apache Software License 2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]
caicancai opened a new pull request, #688: URL: https://github.com/apache/flink-kubernetes-operator/pull/688 ## What is the purpose of the change *(For example: This pull request adds a new feature to periodically create and maintain savepoints through the `FlinkDeployment` custom resource.)* ## Brief change log *(for example:)* - *Periodic savepoint trigger is introduced to the custom resource* - *The operator checks on reconciliation whether the required time has passed* - *The JobManager's dispose savepoint API is used to clean up obsolete savepoints* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / no) - Core observer or reconciler logic that is regularly executed: (yes / no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33357) add Apache Software License 2
蔡灿材 created FLINK-33357: --- Summary: add Apache Software License 2 Key: FLINK-33357 URL: https://issues.apache.org/jira/browse/FLINK-33357 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.0 Reporter: 蔡灿材 Fix For: kubernetes-operator-1.5.0 Attachments: 2023-10-25 12-08-58屏幕截图.png Flinkdeployments.flink.apache.org - v1. Currently yml and flinksessionjobs.flink.apache.org - v1. Yml don't add add Apache Software License 2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779315#comment-17779315 ] Yun Tang commented on FLINK-33355: -- Changing the max-parallelism (instead of parallelism), would break the checkpoint compatibility, which is built by design. You can refer to https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism for more details. > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
[ https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-33355. Resolution: Information Provided > can't reduce the parallelism from 'n' to '1' when recovering through a > savepoint. > - > > Key: FLINK-33355 > URL: https://issues.apache.org/jira/browse/FLINK-33355 > Project: Flink > Issue Type: Bug > Components: API / Core > Environment: flink 1.17.1 >Reporter: zhang >Priority: Major > > If the program includes operators with window, it is not possible to reduce > the parallelism of the operators from n to 1 when restarting from a > savepoint, and it will result in an error: > {code:java} > //IllegalStateException: Failed to rollback to checkpoint/savepoint > Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint > state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 > with max parallelism 128 to new program with max parallelism 1. This > indicates that the program has been changed in a non-compatible way after the > checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33356) The navigation bar on Flink’s official website is messed up.
Junrui Li created FLINK-33356: - Summary: The navigation bar on Flink’s official website is messed up. Key: FLINK-33356 URL: https://issues.apache.org/jira/browse/FLINK-33356 Project: Flink Issue Type: Bug Components: Project Website Reporter: Junrui Li Attachments: image-2023-10-25-11-55-52-653.png The side navigation bar on the Flink official website at the following link: [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed up, as shown in the attached screenshot. !image-2023-10-25-11-55-52-653.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33316][runtime] Avoid unnecessary heavy getStreamOperatorFactory [flink]
1996fanrui commented on code in PR #23550: URL: https://github.com/apache/flink/pull/23550#discussion_r1371104358 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java: ## @@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator operator) { public void setStreamOperatorFactory(StreamOperatorFactory factory) { if (factory != null) { -toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory); +toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory); +config.setString(SERIALIZED_UDF_CLASS_NAME, factory.getClass().getName()); Review Comment: Hi @pnowojski , thanks for your analysis! > I would move: SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName) > check, into the boolean StreamConfig#isSinkWriterOperatorFactory(Class<...> ...) method. > It doesn't fit there very well, BUT at least it would justify why we have the checkState in the > StreamConfig#setStreamOperatorFactory. The solution1 (`SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)` ) is still fragile, right? When `makes SinkWriterOperatorFactory non final and implement a subclass` in the future, it still cannot support, and silently. > toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory); config.setString(IS_INSTANCE_OF_SinkWriterOperatorFactory, factory instance of SinkWriterOperatorFactory); > would be better/cleaner. Either one is fine for me. The solution2 is perfectly compatible with the case of adding sub class. However, as I said before, `the getStreamOperatorFactory is called in the toString to print the class name.`, and I'd like to using the `SERIALIZED_UDF_CLASS_NAME` instead of `getStreamOperatorFactory`. If we just keep `IS_INSTANCE_OF_SinkWriterOperatorFactory`, we must call `getStreamOperatorFactory` in the `toString` method. Or we add the `IS_INSTANCE_OF_SinkWriterOperatorFactory` and `SERIALIZED_UDF_CLASS_NAME` together? Actually, I have solution3 before I create this PR: We store the `SERIALIZED_UDF_CLASS` instead of `SERIALIZED_UDF_CLASS_NAME`. ``` # setStreamOperatorFactory method toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory); toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, factory.getClass()); ``` ``` public > Class getStreamOperatorFactoryClass(ClassLoader cl) { try { return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZED_UDF_CLASS, cl); } catch (Exception e) { throw new StreamTaskException("Could not instantiate chained outputs.", e); } } ``` And check `isAssignableFrom`: ``` SinkWriterOperatorFactory.class.isAssignableFrom(getStreamOperatorFactoryClass(SinkWriterOperatorFactory.class.getClassLoader())); ``` The solution3 is fine, however, I'm worried that when there are multiple classloaders, the judgment may be wrong. That's why this PR store the ClassName instead of Class. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] The io.disk package of flink-runtime module [flink]
Jiabao-Sun commented on PR #23572: URL: https://github.com/apache/flink/pull/23572#issuecomment-1778455547 Hi @RocMarshal, please help review it when you have time. Thanks :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] Module: The io package of flink-runtime [flink]
RocMarshal commented on PR #23200: URL: https://github.com/apache/flink/pull/23200#issuecomment-1778448738 > This PR is too huge to review. I will split it into multiple PRs. SGTM +1. Looking forward to it~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.
zhang created FLINK-33355: - Summary: can't reduce the parallelism from 'n' to '1' when recovering through a savepoint. Key: FLINK-33355 URL: https://issues.apache.org/jira/browse/FLINK-33355 Project: Flink Issue Type: Bug Components: API / Core Environment: flink 1.17.1 Reporter: zhang If the program includes operators with window, it is not possible to reduce the parallelism of the operators from n to 1 when restarting from a savepoint, and it will result in an error: {code:java} //IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 with max parallelism 128 to new program with max parallelism 1. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33354) Reuse the TaskInformation for multiple slots
Rui Fan created FLINK-33354: --- Summary: Reuse the TaskInformation for multiple slots Key: FLINK-33354 URL: https://issues.apache.org/jira/browse/FLINK-33354 Project: Flink Issue Type: Sub-task Components: Runtime / Task Affects Versions: 1.17.1, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan The background is similar to FLINK-33315. A hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. When slotPerTM = 4, one TM will run 4 HiveSources at the same time. How the TaskExecutor to submit a large task? # TaskExecutor#loadBigData will read all bytes from file to SerializedValue ** The SerializedValue has a byte[] ** It will cost the heap memory ** It will be great than 281 MB, because it not only stores HiveSource#partitionBytes, it also stores other information of TaskInformation. # Generate the TaskInformation from SerializedValue ** TaskExecutor#submitTask calls the tdd.getSerializedTaskInformation()..deserializeValue() ** tdd.getSerializedTaskInformation() is SerializedValue ** It will generate the TaskInformation ** TaskInformation includes the Configuration {color:#9876aa}taskConfiguration{color} ** The {color:#9876aa}taskConfiguration{color} includes StreamConfig#{color:#9876aa}SERIALIZEDUDF{color} {color:#172b4d}Based on the above process, TM memory will have 2 big byte array for each task:{color} * {color:#172b4d}The SerializedValue{color} * {color:#172b4d}The TaskInformation{color} When one TM runs 4 HiveSources at the same time, it will have 8 big byte array. In our production environment, this is also a situation that often leads to TM OOM. h2. Solution: These data is totally same due to the PermanentBlobKey is same. We can add a cache for it to reduce the memory and cpu cost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner
[ https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779300#comment-17779300 ] Xin Chen edited comment on FLINK-26603 at 10/25/23 2:58 AM: [~luoyuxia] Ok,thank you very much. :D(y). I will test all with flink-table-planner. was (Author: JIRAUSER298666): [~luoyuxia] Ok,thank you very much. :D(y) > [Umbrella] Decouple Hive with Flink planner > --- > > Key: FLINK-26603 > URL: https://issues.apache.org/jira/browse/FLINK-26603 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Planner >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Fix For: 1.18.0 > > > To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152. > But it also brings much maintenance burden and complexity for it mixes some > logic specific to Hive with Flink planner. We should remove such logic from > Flink planner and make it totally decouple with Flink planner. > With this ticket, we expect: > 1: there won't be any specific logic to Hive in planner module > 2: remove flink-sql-parser-hive from flink-table module > 3: remove the planner dependency in flink-connector-hive > I'll update more details after investigation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner
[ https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779300#comment-17779300 ] Xin Chen commented on FLINK-26603: -- [~luoyuxia] Ok,thank you very much. :D(y) > [Umbrella] Decouple Hive with Flink planner > --- > > Key: FLINK-26603 > URL: https://issues.apache.org/jira/browse/FLINK-26603 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Planner >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Fix For: 1.18.0 > > > To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152. > But it also brings much maintenance burden and complexity for it mixes some > logic specific to Hive with Flink planner. We should remove such logic from > Flink planner and make it totally decouple with Flink planner. > With this ticket, we expect: > 1: there won't be any specific logic to Hive in planner module > 2: remove flink-sql-parser-hive from flink-table module > 3: remove the planner dependency in flink-connector-hive > I'll update more details after investigation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner
[ https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779299#comment-17779299 ] luoyuxia commented on FLINK-26603: -- [~xinchen147] Yes, you're right. Nothing special to table-planner-loader, it's just used to hide the table-planner(which depends on scala) in another classloader. There shouldn't be any problem even though you are not going to use hive dialect. > [Umbrella] Decouple Hive with Flink planner > --- > > Key: FLINK-26603 > URL: https://issues.apache.org/jira/browse/FLINK-26603 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Planner >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Fix For: 1.18.0 > > > To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152. > But it also brings much maintenance burden and complexity for it mixes some > logic specific to Hive with Flink planner. We should remove such logic from > Flink planner and make it totally decouple with Flink planner. > With this ticket, we expect: > 1: there won't be any specific logic to Hive in planner module > 2: remove flink-sql-parser-hive from flink-table module > 3: remove the planner dependency in flink-connector-hive > I'll update more details after investigation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] Module: The io package of flink-runtime [flink]
Jiabao-Sun closed pull request #23200: [FLINK-32850][flink-runtime][JUnit5 Migration] Module: The io package of flink-runtime URL: https://github.com/apache/flink/pull/23200 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33315) Optimize memory usage of large StreamOperator
[ https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-33315: Component/s: Runtime / Task > Optimize memory usage of large StreamOperator > - > > Key: FLINK-33315 > URL: https://issues.apache.org/jira/browse/FLINK-33315 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration, Runtime / Task >Affects Versions: 1.17.0, 1.18.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Attachments: > 130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b.png, > image-2023-10-19-16-28-16-077.png > > > Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM > always fail with java.lang.OutOfMemoryError: Java heap space. > > Here is a example: a hive table with a lot of data, and the > HiveSource#partitionBytes is 281MB. > After analysis, the root cause is that TM maintains the big object with 3 > replicas: > * Replica_1: SourceOperatorFactory (it's necessary for running task) > * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object. > ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code > link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646]) > ** When creating a successor operator to a SourceOperator, the call stack is: > *** OperatorChain#createOperatorChain -> > *** wrapOperatorIntoOutput -> > *** getOperatorRecordsOutCounter -> > *** operatorConfig.getStreamOperatorFactory(userCodeClassloader) > ** It will generate the SourceOperatorFactory temporarily and just check > whether it's SinkWriterOperatorFactory > * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color} > ** It is used to generate SourceOperatorFactory. > ** Now the value is always maintained in heap memory. > ** However, after generating we can release it or store it in the disk if > needed. > *** We can define a threshold, when the value size is less than threshold, > the release strategy doesn't take effect. > ** If so, we can save a lot of heap memory. > These three replicas use about 800MB of memory. Please note that this is just > a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same > time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB. > These large objects in the JVM cannot be recycled, causing TM to frequently > OOM. > This JIRA focus on optimizing Replica_2 and Replica_3. > > !image-2023-10-19-16-28-16-077.png! > > !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] Module: The io package of flink-runtime [flink]
Jiabao-Sun commented on PR #23200: URL: https://github.com/apache/flink/pull/23200#issuecomment-1778414947 This PR is too huge to review. I will split it into multiple PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner
[ https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779298#comment-17779298 ] Xin Chen commented on FLINK-26603: -- Hi, [~luoyuxia] Thank you. Yes, as the table-planner-loader module was introduced to decouple scala, I have no specific requirements for the scala version, so I am considering fully using the table-planner to replace table-planner-loader for all scenarios which include scenes that do not involve the hive dialect. Please allow me to confirm again: What I am concerned about is the impact or risk of completely using this module on other functions or performance when it not involves the hive dialect. I think it's also completely okay, after all, the table-planner-loader module actually uses the table-planner. The only difference may be that the classloader mechanism of the loader module is no longer used? Is there any problem with this?I think it's probably not > [Umbrella] Decouple Hive with Flink planner > --- > > Key: FLINK-26603 > URL: https://issues.apache.org/jira/browse/FLINK-26603 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Planner >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Fix For: 1.18.0 > > > To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152. > But it also brings much maintenance burden and complexity for it mixes some > logic specific to Hive with Flink planner. We should remove such logic from > Flink planner and make it totally decouple with Flink planner. > With this ticket, we expect: > 1: there won't be any specific logic to Hive in planner module > 2: remove flink-sql-parser-hive from flink-table module > 3: remove the planner dependency in flink-connector-hive > I'll update more details after investigation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33315) Optimize memory usage of large StreamOperator
[ https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-33315: Attachment: 130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b.png > Optimize memory usage of large StreamOperator > - > > Key: FLINK-33315 > URL: https://issues.apache.org/jira/browse/FLINK-33315 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.17.0, 1.18.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Attachments: > 130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b.png, > image-2023-10-19-16-28-16-077.png > > > Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM > always fail with java.lang.OutOfMemoryError: Java heap space. > > Here is a example: a hive table with a lot of data, and the > HiveSource#partitionBytes is 281MB. > After analysis, the root cause is that TM maintains the big object with 3 > replicas: > * Replica_1: SourceOperatorFactory (it's necessary for running task) > * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object. > ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code > link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646]) > ** When creating a successor operator to a SourceOperator, the call stack is: > *** OperatorChain#createOperatorChain -> > *** wrapOperatorIntoOutput -> > *** getOperatorRecordsOutCounter -> > *** operatorConfig.getStreamOperatorFactory(userCodeClassloader) > ** It will generate the SourceOperatorFactory temporarily and just check > whether it's SinkWriterOperatorFactory > * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color} > ** It is used to generate SourceOperatorFactory. > ** Now the value is always maintained in heap memory. > ** However, after generating we can release it or store it in the disk if > needed. > *** We can define a threshold, when the value size is less than threshold, > the release strategy doesn't take effect. > ** If so, we can save a lot of heap memory. > These three replicas use about 800MB of memory. Please note that this is just > a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same > time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB. > These large objects in the JVM cannot be recycled, causing TM to frequently > OOM. > This JIRA focus on optimizing Replica_2 and Replica_3. > > !image-2023-10-19-16-28-16-077.png! > > !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33315) Optimize memory usage of large StreamOperator
[ https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-33315: Description: Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM always fail with java.lang.OutOfMemoryError: Java heap space. Here is a example: a hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. After analysis, the root cause is that TM maintains the big object with 3 replicas: * Replica_1: SourceOperatorFactory (it's necessary for running task) * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object. ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646]) ** When creating a successor operator to a SourceOperator, the call stack is: *** OperatorChain#createOperatorChain -> *** wrapOperatorIntoOutput -> *** getOperatorRecordsOutCounter -> *** operatorConfig.getStreamOperatorFactory(userCodeClassloader) ** It will generate the SourceOperatorFactory temporarily and just check whether it's SinkWriterOperatorFactory * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color} ** It is used to generate SourceOperatorFactory. ** Now the value is always maintained in heap memory. ** However, after generating we can release it or store it in the disk if needed. *** We can define a threshold, when the value size is less than threshold, the release strategy doesn't take effect. ** If so, we can save a lot of heap memory. These three replicas use about 800MB of memory. Please note that this is just a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB. These large objects in the JVM cannot be recycled, causing TM to frequently OOM. This JIRA focus on optimizing Replica_2 and Replica_3. !image-2023-10-19-16-28-16-077.png! !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93! was: Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM always fail with java.lang.OutOfMemoryError: Java heap space. Here is a example: a hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. After analysis, the root cause is that TM maintains the big object with 3 replicas: * Replica_1: SourceOperatorFactory (it's necessary for running task) * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object. ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646]) ** When creating a successor operator to a SourceOperator, the call stack is: *** OperatorChain#createOperatorChain -> *** wrapOperatorIntoOutput -> *** getOperatorRecordsOutCounter -> *** operatorConfig.getStreamOperatorFactory(userCodeClassloader) ** It will generate the SourceOperatorFactory temporarily and just check whether it's SinkWriterOperatorFactory * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color} ** It is used to generate SourceOperatorFactory. ** Now the value is always maintained in heap memory. ** However, after generating we can release it or store it in the disk if needed. *** We can define a threshold, when the value size is less than threshold, the release strategy doesn't take effect. ** If so, we can save a lot of heap memory. These three replicas use about 800MB of memory. Please note that this is just a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB. These large objects in the JVM cannot be recycled, causing TM to frequently OOM. This JIRA focus on optimizing Replica_2 and Replica_3. !image-2023-10-19-16-28-16-077.png! !https://f.haiserve.com/download/5366d5f07c07a00116b148c6fa1ebff00b01021cc3da0438a0860702016976849360726a?userid=146850&token=d4a7e7d617dc71ea28bf02977333e1a8|width=1935,height=1127! > Optimize memory usage of large StreamOperator > - > > Key: FLINK-33315 > URL: https://issues.apache.org/jira/browse/FLINK-33315 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.17.0, 1.18.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Attachments: image-2023-10-19-16-28-16-077.png > > > Some of our batch jobs are upgr
Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]
afedulov commented on PR #23395: URL: https://github.com/apache/flink/pull/23395#issuecomment-1778150546 > That being said, @afedulov, do you think it's worthwhile bringing up the new feature on the mailing list to discuss? This was my initial thought, yes. Ideally we do not want to introduce functionality for very niche use cases, but this one makes sense to me, especially for building demos etc. Although this change, in my opinion, does not deserve a FLIP, I think it still makes sense to do a quick vote in the dev mailing list. The idea would be to prepend the topic with [VOTE], briefly describe the proposal, why it is useful and the downsides of it not being the best practice (Ryan's concerns). If no one comments - this is a silent yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1778137755 @zentol thanks a lot for the review!! I addressed all comments from your first pass, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370875073 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +ByteArrayInputSt
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java: ## @@ -27,7 +27,10 @@ * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. + * + * @deprecated Use {@link org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead */ +@Deprecated Review Comment: Nice trick :) Done https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java: ## @@ -27,7 +27,10 @@ * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. + * + * @deprecated Use {@link org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead */ +@Deprecated Review Comment: Nice trick :) Done [`e0f2041` (#23553)](https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java: ## @@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); -DataStream input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); +DataStream input1 = env.fromSequence(1, 4).setMaxParallelism(128); +DataStream input2 = env.fromSequence(1, 4).setMaxParallelism(129); Review Comment: Yes, this fixes test failures that arise because of this: https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307 the SingleOutputStreamOperator caps max parallelism to 1. The current implementation of `fromSequence`, somewhat inconsistently, allows parallel execution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]
snuyanzin commented on PR #23519: URL: https://github.com/apache/flink/pull/23519#issuecomment-1778108982 thanks for the contribution in general it looks ok from my side i left a couple of minor comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]
snuyanzin commented on code in PR #23519: URL: https://github.com/apache/flink/pull/23519#discussion_r1370858120 ## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java: ## @@ -66,12 +70,17 @@ * * * - * Once Flink applies same logic for both table api and sql, this class should be removed. + * Once Flink applies same logic for both table api and sql, this first changes should be + * removed. + * + * 2. It uses PEEK_FIELDS_NO_EXPAND with a nested struct type (Flink [[RowType]]). Review Comment: ```suggestion * 2. It uses {@link PEEK_FIELDS_NO_EXPAND} with a nested struct type (Flink [[{@link RowType}]]). ``` i wonder whether usage of links could improve navigation in documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]
snuyanzin commented on code in PR #23519: URL: https://github.com/apache/flink/pull/23519#discussion_r1370857278 ## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java: ## @@ -66,12 +70,17 @@ * * * - * Once Flink applies same logic for both table api and sql, this class should be removed. + * Once Flink applies same logic for both table api and sql, this first changes should be + * removed. + * + * 2. It uses PEEK_FIELDS_NO_EXPAND with a nested struct type (Flink [[RowType]]). + * + * See more at {@code LogicalRelDataTypeConverter} and {@code FlinkTypeFactory}. Review Comment: ```suggestion * See more at {@link LogicalRelDataTypeConverter} and {@link FlinkTypeFactory}. ``` i guess it's better to use `@link` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. Review Comment: >Theoretically we could also fix this (in a follow-up!) but it doesn't seem worth the overhead given the number of elements. Oh I think this already works; see below comment. Good point, I mainly decided to restrict it because this is what the current underlying SourceFunction was delivering (FromElementsFunction). I am not sure if there were any specific additional considerations that required to limit it to the parallelism of 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998 ## flink-tests/pom.xml: ## @@ -284,6 +284,13 @@ under the License. test + + org.apache.flink + flink-avro Review Comment: > I cant really tell how this (and some of the flink-tests changes) related to fromElements. That's actually an easy one. You might remember this - https://issues.apache.org/jira/browse/FLINK-21386 . I added the test to explicitly verify that the issue you ran into with Avro and `fromElements` is handled correctly after the addition of the `OutputTypeConfigurable` functionality to `DataGeneratorSource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998 ## flink-tests/pom.xml: ## @@ -284,6 +284,13 @@ under the License. test + + org.apache.flink + flink-avro Review Comment: > I cant really tell how this (and some of the flink-tests changes) related to fromElements. That's actually an easy one. You might remember thttps://issues.apache.org/jira/browse/FLINK-21386 . I added the test to explicitly verify that the issue you ran into with Avro and `fromElements` is handled correctly after the addition of the `OutputTypeConfigurable` functionality to `DataGeneratorSource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: The FLIP-27 source gets chained, while the test requires a non chainable stream (compare with `createChainableStream`). In this PR I dealt with it by switching to fromCollection which is still based on the `SourceFunction`. In the follow-up PR for the `fromCollection` migration I had to add a legacy source: https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119 Not sure if we can get a `nonChainableStream` with a FLIP-27 source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: The FLIP-27 source gets chained, while the test requires a non chainable stream (compare with `createChainableStream`). In this PR I dealt with it by switching to fromCollection that is still based on the `SourceFunction`. In the follow-up PR for the `fromCollection` migration I had to add a legacy source: https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119 Not sure if we can get a `nonChainableStream` with a FLIP-27 source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370832543 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism( configuration.set( PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM, chainingOfOperatorsWithDifferentMaxParallelismEnabled); -configuration.set(PipelineOptions.MAX_PARALLELISM, 10); +configuration.set(PipelineOptions.MAX_PARALLELISM, 1); try (StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1, configuration)) { chainEnv.fromElements(1) .map(x -> x) // should automatically break chain here .map(x -> x) -.setMaxParallelism(1) +.setMaxParallelism(10) Review Comment: The verifies that the chain gets broken. The legacy source was not enforcing max parallelism set to 1, something that we do now by propagating the call to super (https://github.com/afedulov/flink/blob/cf1a29d47a5bb4fb92e98a36934e525d74bae17b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L208). Notice that the default max parallelism in the config also got changed above from 10 to 1. So now we start with the source with max parallelism of 1 and break the chain because the second map has parallelism of 10. Previously it was doing the same, but in "reverse". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370824409 ## flink-formats/flink-parquet/src/test/resources/avro/user.avsc: ## @@ -1,9 +0,0 @@ -{ - "namespace": "org.apache.flink.connector.datagen.source.generated", - "type": "record", - "name": "User", Review Comment: I added it accidentally to the wrong package in one of the earlier commits and this [tmp] commit just cleans it up. I got rid of both changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33121] Failed precondition in JobExceptionsHandler due to concurrent global failures [flink]
pgaref commented on PR #23440: URL: https://github.com/apache/flink/pull/23440#issuecomment-1778042754 Hey @dmvk -- thanks for the comments! 1. Was debating about that, however testing e2e would require to add the mocked AdaptiveScheduler dependency (like `ExceptionHistoryTester`) to JobExceptionsHandlerTest -- that even though is possible, would add more complexity than value IMO as we just need to ensure the order of entries for the conversion is correct (similar to what we do in the AdaptiveSchedulerTest in the PR) 2. Regarding `testExceptionHistoryWithTaskConcurrentGlobalFailure`, two concurrent Global failures could never occur with the current logic in the AdaptiveScheduler -- so I modified the test to represent reality and include our case too 3. This issue is not related to failure enrichment at all -- it is just not too common as there should be one or more concurrent Task/Local failures followed by a Global one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]
dalelane commented on PR #23395: URL: https://github.com/apache/flink/pull/23395#issuecomment-1778020433 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]
dalelane commented on PR #23395: URL: https://github.com/apache/flink/pull/23395#issuecomment-1777990170 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370765358 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1161,9 +1163,10 @@ void testYieldingOperatorChainableToTaskNotChainedToLegacySource() { */ @Test void testYieldingOperatorProperlyChainedOnLegacySources() { +// TODO: this test can be removed when the legacy SourceFunction API gets removed StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1); -chainEnv.fromElements(1) +chainEnv.addSource(new LegacySource()) Review Comment: Seems so. Has something to do with threading: ``` [FLINK-16219][runtime] Disallow chaining of legacy source and yielding operator. This change allows yielding operators to be eagerly chained whenever possible, except after legacy sources. Yielding operators do not properly work when processInput is called from another thread, but are usually fine in any other chain. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +ByteArrayInputSt
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +ByteArrayInputSt
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757864 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +ByteArrayInputSt
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. Review Comment: >Theoretically we could also fix this (in a follow-up!) but it doesn't seem worth the overhead given the number of elements. Oh I think this already works; see below comment. Good point, I mainly decided to restrict it because this is what the current underlying SourceFunction was delivering (FromElementsFunction). I am not sure what was the intention to limit it to the parallelism of 1. ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import ja
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java: ## @@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); -DataStream input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); +DataStream input1 = env.fromSequence(1, 4).setMaxParallelism(128); +DataStream input2 = env.fromSequence(1, 4).setMaxParallelism(129); Review Comment: Yes, this fixes test failures that arise because of this: https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307 the SingleOutputStreamOperator caps max parallelism to 1. Since this PR is already pretty sizable, it seemed appropriate to postpone dealing with this when we work on `fromSequence`. ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5: ## @@ -19,8 +19,8 @@ Method calls method in (RecreateOnResetOperatorCoordinator.java:361) Method calls method in (TaskManagerConfiguration.java:244) Method calls method in (TaskManagerConfiguration.java:246) -Method calls method in (TaskManagerServices.java:433) -Method calls method in (TaskManagerServices.java:431) Review Comment: Indeed. I had to enable refreeze to add missing datagen source violations, but how exactly it is supposed to work it archunit is still a bit of a mystery to me to be honest. ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e: ## @@ -64,6 +64,19 @@ Constructor (int, org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, org.apache.flink.connector.base.source.reader.splitreader.SplitReader, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, boolean)> calls method in (SplitFetcher.java:97) Constructor (org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, java.util.function.Supplier, org.apache.flink.configuration.Configuration, java.util.function.Consumer)> is annotated with in (SplitFetcherManager.java:0) Constructor (int)> calls method in (FutureCompletingBlockingQueue.java:114) +Constructor (org.apache.flink.api.common.typeutils.TypeSerializer, java.lang.Iterable)> calls method in (FromElementsGeneratorFunction.java:85) Review Comment: That's what I thought too. As also tracked in the wiki, the DataGen will not be externalized: https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32986][test] Fix createTemporaryFunction type inference error [flink]
jeyhunkarimov commented on PR #23586: URL: https://github.com/apache/flink/pull/23586#issuecomment-1777952204 Hi @snuyanzin Could you please review the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33321][hotfix] VertexFlameGraphFactoryTest#verifyRecursively doesn't work on java 21 [flink]
snuyanzin merged PR #23583: URL: https://github.com/apache/flink/pull/23583 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32896][Runtime/Coordination] Incorrect `Map.computeIfAbsent(..., ...::new)` usage which misinterprets key as initial capacity [flink]
tzy-0x7cf commented on PR #23518: URL: https://github.com/apache/flink/pull/23518#issuecomment-1777866572 > Could squash the commits and rebase the branch to most-recent `master`? We don't want have merge commits cluttering the git history. > > The Flink CI bot is known to have issues with force-pushes. You can try to work around it by pushing an empty commit after you've reorganized and rebased the branch (in a separate push to be on the save side). Thanks Matthias! I'm new to flink and would like to contribute more , so if there's anything about test or simple issues, please feel free to assign them to me! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]
dalelane commented on PR #23395: URL: https://github.com/apache/flink/pull/23395#issuecomment-1777847541 Thanks for the reviews - much appreciated 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]
dalelane commented on code in PR #23395: URL: https://github.com/apache/flink/pull/23395#discussion_r1370689647 ## flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java: ## @@ -222,12 +234,14 @@ void testSerializeDeserializeBasedOnNestedSchema() throws Exception { AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema( rowType, - AvroSerializationSchema.forGeneric(nullableOuterSchema), +AvroSerializationSchema.forGeneric( Review Comment: good point - updated the test in the same way with ParameterizedTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]
dalelane commented on code in PR #23395: URL: https://github.com/apache/flink/pull/23395#discussion_r1370649857 ## flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java: ## @@ -37,46 +38,56 @@ class AvroDeserializationSchemaTest { private static final Address address = TestDataGenerator.generateRandomAddress(new Random()); +private static final AvroEncoding[] ENCODINGS = {AvroEncoding.BINARY, AvroEncoding.JSON}; + @Test void testNullRecord() throws Exception { Review Comment: Nice - that's a new one on me, thanks. That's much better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data [flink-connector-kafka]
tzulitai closed pull request #52: [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data URL: https://github.com/apache/flink-connector-kafka/pull/52 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33187) Don't record duplicate event if no change
[ https://issues.apache.org/jira/browse/FLINK-33187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779195#comment-17779195 ] Gyula Fora commented on FLINK-33187: merged to main faaff564e1bb3d8ca51c939d34dd416585a3de74 > Don't record duplicate event if no change > - > > Key: FLINK-33187 > URL: https://issues.apache.org/jira/browse/FLINK-33187 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Affects Versions: 1.17.1 >Reporter: Clara Xiong >Assignee: Clara Xiong >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.7.0 > > > Problem: > Some events are recorded repeatedly such as ScalingReport when autoscaling is > not enable, which consists 99% of all events in our prod env. This wastes > resources and causes performance downstream. > Proposal: > Suppress duplicate event within an interval defined by a new operator config > "scaling.report.interval" in second, defaulted to 1800. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]
gyfora merged PR #685: URL: https://github.com/apache/flink-kubernetes-operator/pull/685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]
clarax commented on code in PR #685: URL: https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370455045 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -201,8 +201,8 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); -public static final ConfigOption SCALING_REPORT_INTERVAL = -autoScalerConfig("scaling.report.interval") +public static final ConfigOption SCALING_EVENT_INTERVAL = +autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofSeconds(1800)) .withDescription("Time interval to resend the identical event"); Review Comment: > As this comment[1] mentioned: `all config keys to work with the "old" syntax at least in the 1.7.0 release.`, so please update it, thanks! > > Sorry for that, I didn't support the old key during decoupling autoscaler and kubernetes-operator. > > [#686 (comment)](https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1369777698) updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process
[ https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779150#comment-17779150 ] Jing Ge edited comment on FLINK-33301 at 10/24/23 3:45 PM: --- The value(my intention) was to fail faster before maven is called, like I mentioned, the entry point is script not maven. IMHO, anything we built to improve the efficiency will need to be maintained. That is the cost. The key point is how often the built thing will be used vs. how often it need to be maintained. Afaik, the versions of Java or maven might need to be changed/maintained once every 2 years or even longer. But, I got your point. Your thoughts are rational too. Let's stay with your solution and we could still come back after we got more feeling/feedback about it. WDYT? was (Author: jingge): The value(my intention) was to fail faster before maven is called, like I mentioned the entry point is script not maven. IMHO, anything we built to improve the efficiency will need to be maintained. That is the cost. The key point is how often the built thing will be used vs. how often it need to be maintained. Afaik, the versions of Java or maven might need to be changed/maintained once every 2 years or even longer. But, I got your point. Your thoughts are rational too. Let's stay with your solution and we could still come back after we got more feeling/feedback about it. WDYT? > Add Java and Maven version checks in the bash script of Flink release process > - > > Key: FLINK-33301 > URL: https://issues.apache.org/jira/browse/FLINK-33301 > Project: Flink > Issue Type: Bug > Components: Release System >Affects Versions: 1.18.0, 1.19.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > > During the release, Flink requires specific version of Java and Maven[1]. It > makes sense to check those versions at the very beginning of some bash > scripts to let it fail fast and therefore improve the efficiency. > > [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31863) Add 'Hostname' enum val to k8s NodeAddress type.
[ https://issues.apache.org/jira/browse/FLINK-31863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31863: -- Fix Version/s: (was: 1.17.2) > Add 'Hostname' enum val to k8s NodeAddress type. > > > Key: FLINK-31863 > URL: https://issues.apache.org/jira/browse/FLINK-31863 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.17.0 > Environment: OS: CentOS 7 > Kubernetes: v1.18.5 >Reporter: Vince.Feng >Priority: Major > Labels: easyfix, kubernetes, pull-request-available > Attachments: image-2023-04-20-17-53-30-969.png > > Original Estimate: 1h > Remaining Estimate: 1h > > Class io.fabric8.kubernetes.api.model.NodeAddress.type contains > 'InternalIP','ExternalIP' and 'Hostname'. The InternalIP address is > unavailable in the private cloud environment. But the hostname can be > resolved by the DNS server. So > 'org.apache.flink.kubernetes.configuration.NodePortAddressType.NodePortAddressType' > should add 'Hostname' enumeration value. > {code:java} > //org.apache.flink.kubernetes.configuration.NodePortAddressType.NodePortAddressType > public enum NodePortAddressType { > InternalIP, > ExternalIP, > Hostname > } {code} > > !image-2023-04-20-17-53-30-969.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31601) While waiting for resources, resources check might be scheduled unlimited number of times (Adaptive Scheduler)
[ https://issues.apache.org/jira/browse/FLINK-31601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31601: -- Fix Version/s: (was: 1.17.2) > While waiting for resources, resources check might be scheduled unlimited > number of times (Adaptive Scheduler) > -- > > Key: FLINK-31601 > URL: https://issues.apache.org/jira/browse/FLINK-31601 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Roman Khachatryan >Priority: Minor > > See [https://github.com/apache/flink/pull/22169#discussion_r1136395017] > {quote}when {{resourceStabilizationDeadline}} is not null, should we skip > scheduling {{checkDesiredOrSufficientResourcesAvailable}} (on [line > 166|https://github.com/apache/flink/blob/a64781b1ef8f129021bdcddd3b07548e6caa4a72/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java#L166])? > Otherwise, we schedule as many checks as there are changes in resources. > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process
[ https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779101#comment-17779101 ] Jing Ge edited comment on FLINK-33301 at 10/24/23 3:45 PM: --- Thanks for sharing your thoughts. I am trying to share my experience and feeling that most release managers will be facing the same issues. Following are my thoughts: # Yes, using pom to control the versions centrally is a good idea. But the current Flink release is not pure maven based, it contains many scripts beyond Maven. If someone like me has wrong versions of Java or Maven, it can only be failed at the time when maven -Prelease is called. # During Flink 1.18 release I ran the release process 4 times and I have other works in between. Since Flink is an open source project, I guess what I was facing is very common, release managers will have other works in parallel. Java and Maven versions will be switched back and forth depending on which project the release manager is working on. Each time, when I switched back to Flink release, I would not always call mvn -Prelease, because I already did the preparation. I just jumped into the "Build a release candidate" section. All scripts described on the wiki page will be executed. And the fact is that I created my own protocol/program to execute the release in order to move faster without read the long content. At that time, my only focus was those scripts. I had the version issues twice more in addition to the one detected by rc2. As a release manager, I am keen to have the check at script level, because it is the entry point of my work. I might have my own script on top of the official scripts to save even more time. Like I did with the PR to have the following error messages(as example) at very beginning with any script I might call: {code:java} Java version is incorrect. Required version: 1.8, but it is 17.0.8 Maven version is incorrect. Required version: 3.8.6, but it is 3.9.1{code} Since the output is controlled by us, we can tell release managers anything we want to help them work more efficiently. This is my personal experience, I want to save all future release managers' time I could have saved for myself. I am not sure if Maven could help me the way I required. was (Author: jingge): Thanks for sharing your thoughts. I am trying to share my experience and feeling that most release managers will be facing the same issues. Following are my thoughts: # Yes, using pom to control the versions centrally is a good idea. But the current Flink release is not pure maven based, it contains many scripts beyond Maven. If someone like me has wrong versions of Java or Maven, it can only be failed at the time when maven -Prelease is called. # During Flink 1.18 release I ran the release process 4 times and I have other works in between. Since Flink is an open source project, I guess what I was facing is very common, release managers will have other works in parallel. Java and Maven versions will be switched back and forth depending on which project the release manager is working on. Each time, when I switched back to Flink release, I would not always call mvn -Prelease, because I already did the preparation. I just jumped into the "Build a release candidate" section. All scripts described on the wiki page will be executed. And the fact is that I created my own protocol/program to execute the release in order to move faster without read the long content. At that time, my only focus was those scripts. I had the version issues twice more in addition to the one detected by rc2. As a release manager, I am keen to have the check at script level, because it is the entry point of my work. I might have my own script on top of the official scripts to save even more time. Like I did with the PR to have the following error messages(as example) at very beginning with any script I might call: {code:java} Java version is incorrect. Required version: 1.8, but it is 17.0.8 Maven version is incorrect. Required version: 3.8.6, but it is 3.9.1{code} Since the output is controlled by us, we can tell release managers anything we want to help them work more efficiently. This is my personal experience, I want to save all future release managers' time I could have saved. I am not sure if Maven could help me the way I required. > Add Java and Maven version checks in the bash script of Flink release process > - > > Key: FLINK-33301 > URL: https://issues.apache.org/jira/browse/FLINK-33301 > Project: Flink > Issue Type: Bug > Components: Release System >Affects Versions: 1.18.0, 1.19.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Minor > L
[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process
[ https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779150#comment-17779150 ] Jing Ge edited comment on FLINK-33301 at 10/24/23 3:44 PM: --- The value(my intention) was to fail faster before maven is called, like I mentioned the entry point is script not maven. IMHO, anything we built to improve the efficiency will need to be maintained. That is the cost. The key point is how often the built thing will be used vs. how often it need to be maintained. Afaik, the versions of Java or maven might need to be changed/maintained once every 2 years or even longer. But, I got your point. Your thoughts are rational too. Let's stay with your solution and we could still come back after we got more feeling/feedback about it. WDYT? was (Author: jingge): The value(my intention) is to fail faster before maven is called, like I mentioned the entry point is script not maven. IMHO, anything we built to improve the efficiency will need to be maintained. That is the cost. The key point is how often the built thing will be used vs. how often it need to be maintained. Afaik, the versions of Java or maven might need to be changed/maintained once every 2 years or even longer. But, I got your point. Your thoughts are rational too. Let's stay with your solution and we could still come back after we got more feeling/feedback about it. WDYT? > Add Java and Maven version checks in the bash script of Flink release process > - > > Key: FLINK-33301 > URL: https://issues.apache.org/jira/browse/FLINK-33301 > Project: Flink > Issue Type: Bug > Components: Release System >Affects Versions: 1.18.0, 1.19.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > > During the release, Flink requires specific version of Java and Maven[1]. It > makes sense to check those versions at the very beginning of some bash > scripts to let it fail fast and therefore improve the efficiency. > > [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31875) OSS throwns NoClassDefFoundError due to old hadoop-common version
[ https://issues.apache.org/jira/browse/FLINK-31875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31875: -- Fix Version/s: (was: 1.16.3) (was: 1.17.2) > OSS throwns NoClassDefFoundError due to old hadoop-common version > - > > Key: FLINK-31875 > URL: https://issues.apache.org/jira/browse/FLINK-31875 > Project: Flink > Issue Type: Bug > Components: FileSystems >Affects Versions: 1.16.0, 1.17.0 >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available, stale-assigned > > h2. Problem > When using OSS in 1.17, an exception will be thrown: > {code:java} > java.lang.NoClassDefFoundError: > org/apache/hadoop/thirdparty/com/google/common/base/Preconditions > at > org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption(AliyunOSSUtils.java:221) > at > org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:343) > at > org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:147) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274){code} > It could be reproduced in ITCASE of OSS if some envs has been configured. > h2. Why > After https://issues.apache.org/jira/browse/FLINK-27308 and > https://issues.apache.org/jira/browse/FLINK-29502 ,hadoop-aliyun has also be > upgraded to 3.3.4 which relys on the newest version of hadoop-common. > OSS still uses the old version (2.10.2) extended from flink-parent so that > some classes cannot be found. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30219) Fetch results api in sql gateway return error result.
[ https://issues.apache.org/jira/browse/FLINK-30219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-30219: -- Fix Version/s: (was: 1.16.3) > Fetch results api in sql gateway return error result. > - > > Key: FLINK-30219 > URL: https://issues.apache.org/jira/browse/FLINK-30219 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: Aiden Gong >Assignee: Aiden Gong >Priority: Critical > Labels: pull-request-available, stale-assigned > Attachments: image-2022-11-26-10-38-02-270.png > > > !image-2022-11-26-10-38-02-270.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33089) Drop Flink 1.13 and 1.14 support for the operator
[ https://issues.apache.org/jira/browse/FLINK-33089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-33089: --- Description: As agreed with the community we will only support the last 4 stable Flink minor versions. With Flink 1.18 already out, we should drop 1.13 and 1.14 support from the operator. This includes any special codepaths required and we should probably throw a validation error and short-circuit reconciliation on unsupported versions to signal to users and avoid any accidental deployment problems. was: As agreed with the community we will only support the last 4 stable Flink minor versions. With Flink 1.17 already out, we should drop 1.13 support from the operator. This includes any special codepaths required and we should probably throw a validation error and short-circuit reconciliation on unsupported versions to signal to users and avoid any accidental deployment problems. > Drop Flink 1.13 and 1.14 support for the operator > - > > Key: FLINK-33089 > URL: https://issues.apache.org/jira/browse/FLINK-33089 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Fix For: kubernetes-operator-1.7.0 > > > As agreed with the community we will only support the last 4 stable Flink > minor versions. > With Flink 1.18 already out, we should drop 1.13 and 1.14 support from the > operator. > This includes any special codepaths required and we should probably throw a > validation error and short-circuit reconciliation on unsupported versions to > signal to users and avoid any accidental deployment problems. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33089) Drop Flink 1.13 and 1.14 support for the operator
[ https://issues.apache.org/jira/browse/FLINK-33089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-33089: --- Summary: Drop Flink 1.13 and 1.14 support for the operator (was: Drop Flink 1.13 support) > Drop Flink 1.13 and 1.14 support for the operator > - > > Key: FLINK-33089 > URL: https://issues.apache.org/jira/browse/FLINK-33089 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Fix For: kubernetes-operator-1.7.0 > > > As agreed with the community we will only support the last 4 stable Flink > minor versions. > With Flink 1.17 already out, we should drop 1.13 support from the operator. > This includes any special codepaths required and we should probably throw a > validation error and short-circuit reconciliation on unsupported versions to > signal to users and avoid any accidental deployment problems. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process
[ https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779150#comment-17779150 ] Jing Ge commented on FLINK-33301: - The value(my intention) is to fail faster before maven is called, like I mentioned the entry point is script not maven. IMHO, anything we built to improve the efficiency will need to be maintained. That is the cost. The key point is how often the built thing will be used vs. how often it need to be maintained. Afaik, the versions of Java or maven might need to be changed/maintained once every 2 years or even longer. But, I got your point. Your thoughts are rational too. Let's stay with your solution and we could still come back after we got more feeling/feedback about it. WDYT? > Add Java and Maven version checks in the bash script of Flink release process > - > > Key: FLINK-33301 > URL: https://issues.apache.org/jira/browse/FLINK-33301 > Project: Flink > Issue Type: Bug > Components: Release System >Affects Versions: 1.18.0, 1.19.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > > During the release, Flink requires specific version of Java and Maven[1]. It > makes sense to check those versions at the very beginning of some bash > scripts to let it fail fast and therefore improve the efficiency. > > [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33353) SQL fails because "TimestampType.kind" is not serialized
Ferenc Csaky created FLINK-33353: Summary: SQL fails because "TimestampType.kind" is not serialized Key: FLINK-33353 URL: https://issues.apache.org/jira/browse/FLINK-33353 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.18.0 Reporter: Ferenc Csaky We have a custom persistent catalog store, which stores tables, views etc. in a DB. In our application, it is required to utilize the serialized formats of entities, but the same applies to the Hive, as it functions as a persistent catalog. Take the following example SQL: {code:sql} CREATE TABLE IF NOT EXISTS `txn_gen` ( `txn_id` INT, `amount` INT, `ts` TIMESTAMP(3), WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND ) WITH ( 'connector' = 'datagen', 'fields.txn_id.min' = '1', 'fields.txn_id.max' = '5', 'rows-per-second' = '1' ); CREATE VIEW IF NOT EXISTS aggr_ten_sec AS SELECT txn_id, TUMBLE_ROWTIME(`ts`, INTERVAL '10' SECOND) AS w_row_time, COUNT(txn_id) AS txn_count FROM txn_gen GROUP BY txn_id, TUMBLE(`ts`, INTERVAL '10' SECOND); SELECT txn_id, SUM(txn_count), TUMBLE_START(w_row_time, INTERVAL '20' SECOND) AS total_txn_count FROM aggr_ten_sec GROUP BY txn_id, TUMBLE(w_row_time, INTERVAL '20' SECOND); {code} This will work without any problems when we simply execute it in a {{TableEnvironment}}, but it fails with the below error when we try to execute the query based on the serialized table metadata. {code} org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. {code} If there is a view which would require to use ROWTIME, it will be lost and we cannot recreate the same query from the serialized entites. Currently in {{TimestampType}} the "kind" field is deliberatly annotated as {{@Internal}} and is not serialized, although it breaks this functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]
gyfora commented on code in PR #685: URL: https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370421339 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java: ## @@ -197,4 +198,19 @@ private static Event buildEvent( .endMetadata() .build(); } + +private static boolean intervalCheck(Event existing, @Nullable Duration interval) { +return interval != null +&& Instant.now() +.isBefore( +Instant.parse(existing.getLastTimestamp()) +.plusMillis(interval.toMillis())); +} + +private static boolean labelCheck( +Event existing, Predicate> dedupePredicate) { +return dedupePredicate == null +|| (existing.getMetadata() != null +&& dedupePredicate.test(existing.getMetadata().getLabels())); +} Review Comment: makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]
clarax commented on code in PR #685: URL: https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370421148 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -201,8 +201,8 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); -public static final ConfigOption SCALING_REPORT_INTERVAL = -autoScalerConfig("scaling.report.interval") +public static final ConfigOption SCALING_EVENT_INTERVAL = +autoScalerConfig("scaling.event.interval") Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]
clarax commented on code in PR #685: URL: https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370420200 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java: ## @@ -197,4 +198,19 @@ private static Event buildEvent( .endMetadata() .build(); } + +private static boolean intervalCheck(Event existing, @Nullable Duration interval) { +return interval != null +&& Instant.now() +.isBefore( +Instant.parse(existing.getLastTimestamp()) +.plusMillis(interval.toMillis())); +} + +private static boolean labelCheck( +Event existing, Predicate> dedupePredicate) { +return dedupePredicate == null +|| (existing.getMetadata() != null +&& dedupePredicate.test(existing.getMetadata().getLabels())); +} Review Comment: when Interval == null, we don't dedupe. This is intentional. It is commented in the interface method param and unit tested. but when the config is null, we use the default value of 30 min. It is also documented and unit tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33306) Use observed true processing rate when source metrics are incorrect
[ https://issues.apache.org/jira/browse/FLINK-33306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-33306. -- Resolution: Fixed merged to main cc680e142bb8d52c4db215658ee7f4c4159a0fe4 > Use observed true processing rate when source metrics are incorrect > --- > > Key: FLINK-33306 > URL: https://issues.apache.org/jira/browse/FLINK-33306 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Critical > Labels: pull-request-available > Fix For: kubernetes-operator-1.7.0 > > > The aim is to address the cases when Flink incorrectly reports low busy time > (high idleness) for sources that are in fact cannot keep up due to the > slowness of the reader/fetchers. As the metrics cannot be generally fixed on > the Flink - connector side we have to detect this and handle it when > collecting the metrics. > The main symptom of this problem is overestimation of the true processing > rate and not triggering scaling even if lag is building up as the autoscaler > thinks it will be able to keep up. > To tackle this we differentiate two different methods of TPR measurement: > # *Busy-time based TPR* (this is the current approach in the autoscaler) : > computed from incoming records and busy time > # *Observed TPR* : computed from incoming records and back pressure, > measurable only when we assume full processing throughput (i.e during > catch-up) > h3. Current behaviour > The operator currently always uses a busy-time based TPR calculation which is > very flexible and allows for scaling up / down but is susceptible to > overestimation due to the broken metrics. > h3. Suggested new behaviour > Instead of using the busy-time based TPR we detect when TPR is overestimated > (busy-time too low) and switch to observed TPR. > To do this, whenever we there is lag for a source (during catchup, or > lag-buildup) we measure both busy-time and observed TPR. > If the avg busy-time based TPR is off by a configured amount we switch to > observed TPR for this source during metric evaluation. > *Why not use observed TPR all the time?* > Observed TPR can only be measured when we are catching up (during > stabilization) or when cannot keep up. This makes it harder to scale down or > to detect changes in source throughput over time (before lag starts to build > up). Instead of using observed TPR we switch to it only when we detect a > problem with the busy-time (this is a rare case overall), to hopefully get > the best of both worlds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33306] Use observed source throughput as true processing rate [flink-kubernetes-operator]
gyfora merged PR #686: URL: https://github.com/apache/flink-kubernetes-operator/pull/686 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33274][release] Add release note for version 1.18 [flink]
JingGe merged PR #23527: URL: https://github.com/apache/flink/pull/23527 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33341) Use available local state for rescaling
[ https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779129#comment-17779129 ] Stefan Richter commented on FLINK-33341: FYI, here is a link to the development branch: https://github.com/apache/flink/compare/master...StefanRRichter:flink:srichter-local-rescaling-FLINK-33341 > Use available local state for rescaling > --- > > Key: FLINK-33341 > URL: https://issues.apache.org/jira/browse/FLINK-33341 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > Local state is currently only used for recovery. However, it would make sense > to also use available local state in rescaling scenarios to reduce the amount > of data to download from remote storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process
[ https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779122#comment-17779122 ] Matthias Pohl commented on FLINK-33301: --- {quote} Yes, using pom to control the versions centrally is a good idea. But the current Flink release is not pure maven based, it contains many scripts beyond Maven. If someone like me has wrong versions of Java or Maven, it can only be failed at the time when maven -Prelease is called. {quote} That is why we have the script. So that you don't have to call {{mvn -Prelease}} on your own. {quote} During Flink 1.18 release I ran the release process 4 times and I have other works in between. Since Flink is an open source project, I guess what I was facing is very common, release managers will have other works in parallel. Java and Maven versions will be switched back and forth depending on which project the release manager is working on. Each time, when I switched back to Flink release, I would not always call mvn -Prelease, because I already did the preparation. I just jumped into the "Build a release candidate" section. All scripts described on the wiki page will be executed. And the fact is that I created my own protocol/program to execute the release in order to move faster without read the long content. At that time, my only focus was those scripts. I had the version issues twice more in addition to the one detected by rc2. As a release manager, I am keen to have the check at script level, because it is the entry point of my work. I might have my own script on top of the official scripts to save even more time. Like I did with the PR to have the following error messages(as example) at very beginning with any script I might call: {quote} Just for the record: the only value you're adding is that the error is raised when calling {{tools/releasing/create_source_release.sh}}. The other two script do print the error already (because the call Maven's enforcement plugin). Additionally, you're introducing a helper script that can be used if you're not familiar with the Maven features for calling the version enforcement correctly. In contrast to that, you're adding one additional artifact where we have to maintain the Maven and Java version. You could adapt your script in a way that it uses the Maven command, I shared in my comment above. This way, we don't introduce new code locations with the version being hard-coded but rely on the Maven configuration. That keeps maintainability at the current level. You could even grep for the relevant output if you're concerned about the verbose ERROR output: {{mvn -q -Prelease -pl flink-annotations validate | grep -v ERROR}}. WDYT? > Add Java and Maven version checks in the bash script of Flink release process > - > > Key: FLINK-33301 > URL: https://issues.apache.org/jira/browse/FLINK-33301 > Project: Flink > Issue Type: Bug > Components: Release System >Affects Versions: 1.18.0, 1.19.0 >Reporter: Jing Ge >Assignee: Jing Ge >Priority: Minor > Labels: pull-request-available > > During the release, Flink requires specific version of Java and Maven[1]. It > makes sense to check those versions at the very beginning of some bash > scripts to let it fail fast and therefore improve the efficiency. > > [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process
[ https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779122#comment-17779122 ] Matthias Pohl edited comment on FLINK-33301 at 10/24/23 2:52 PM: - {quote} Yes, using pom to control the versions centrally is a good idea. But the current Flink release is not pure maven based, it contains many scripts beyond Maven. If someone like me has wrong versions of Java or Maven, it can only be failed at the time when maven -Prelease is called. {quote} That is why we have the script. So that you don't have to call {{mvn -Prelease}} on your own. {quote} During Flink 1.18 release I ran the release process 4 times and I have other works in between. Since Flink is an open source project, I guess what I was facing is very common, release managers will have other works in parallel. Java and Maven versions will be switched back and forth depending on which project the release manager is working on. Each time, when I switched back to Flink release, I would not always call mvn -Prelease, because I already did the preparation. I just jumped into the "Build a release candidate" section. All scripts described on the wiki page will be executed. And the fact is that I created my own protocol/program to execute the release in order to move faster without read the long content. At that time, my only focus was those scripts. I had the version issues twice more in addition to the one detected by rc2. As a release manager, I am keen to have the check at script level, because it is the entry point of my work. I might have my own script on top of the official scripts to save even more time. Like I did with the PR to have the following error messages(as example) at very beginning with any script I might call: {quote} Just for the record: the only value you're adding is that the error is raised when calling {{tools/releasing/create_source_release.sh}}. The other two script do print the error already (because they call Maven's enforcement plugin). Additionally, you're introducing a helper script that can be used if you're not familiar with the Maven features for calling the version enforcement correctly. In contrast to that, you're adding one additional artifact where we have to maintain the Maven and Java version. You could adapt your script in a way that it uses the Maven command, I shared in my comment above. This way, we don't introduce new code locations with the version being hard-coded but rely on the Maven configuration. That keeps maintainability at the current level. You could even grep for the relevant output if you're concerned about the verbose ERROR output: {{mvn -q -Prelease -pl flink-annotations validate | grep -v ERROR}}. WDYT? was (Author: mapohl): {quote} Yes, using pom to control the versions centrally is a good idea. But the current Flink release is not pure maven based, it contains many scripts beyond Maven. If someone like me has wrong versions of Java or Maven, it can only be failed at the time when maven -Prelease is called. {quote} That is why we have the script. So that you don't have to call {{mvn -Prelease}} on your own. {quote} During Flink 1.18 release I ran the release process 4 times and I have other works in between. Since Flink is an open source project, I guess what I was facing is very common, release managers will have other works in parallel. Java and Maven versions will be switched back and forth depending on which project the release manager is working on. Each time, when I switched back to Flink release, I would not always call mvn -Prelease, because I already did the preparation. I just jumped into the "Build a release candidate" section. All scripts described on the wiki page will be executed. And the fact is that I created my own protocol/program to execute the release in order to move faster without read the long content. At that time, my only focus was those scripts. I had the version issues twice more in addition to the one detected by rc2. As a release manager, I am keen to have the check at script level, because it is the entry point of my work. I might have my own script on top of the official scripts to save even more time. Like I did with the PR to have the following error messages(as example) at very beginning with any script I might call: {quote} Just for the record: the only value you're adding is that the error is raised when calling {{tools/releasing/create_source_release.sh}}. The other two script do print the error already (because the call Maven's enforcement plugin). Additionally, you're introducing a helper script that can be used if you're not familiar with the Maven features for calling the version enforcement correctly. In contrast to that, you're adding one additional artifact where we have to maintain the Maven and Java version. You could adapt your scri
[jira] [Commented] (FLINK-33341) Use available local state for rescaling
[ https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779124#comment-17779124 ] Stefan Richter commented on FLINK-33341: [~Yanfei Lei], yes only the previous local state is available to be used in rescaling, so we might still need to download additional state from remote. But oftentimes we don't need to download everything from remote, in particular if we scale out we will often find the complete state locally on some machines and just need to drop some key-groups. And for scale-in, we should at least find one piece of the state locally. There is no good reason not to opportunistically use local state also in rescaling scenarios. No change to the scheduler will be needed. > Use available local state for rescaling > --- > > Key: FLINK-33341 > URL: https://issues.apache.org/jira/browse/FLINK-33341 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > Local state is currently only used for recovery. However, it would make sense > to also use available local state in rescaling scenarios to reduce the amount > of data to download from remote storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33352][rest][docs] Add schema mappings to discriminator properties [flink]
flinkbot commented on PR #23588: URL: https://github.com/apache/flink/pull/23588#issuecomment-1777387430 ## CI report: * 7a802e3654a0e6f2d68cad0deb6af0c4557f082b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org