[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779364#comment-17779364
 ] 

Yun Tang commented on FLINK-33355:
--

I think this is because you forgot to set the uid for each operator. Since 
`windowAll` operator could only have parallelism 1, all operators would chain 
together once you change the parallelism to 1. Please assign the operator id as 
doc 
https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/savepoints/#assigning-operator-ids
 said.

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779358#comment-17779358
 ] 

Yun Tang commented on FLINK-33355:
--

[~edmond_j] How did you assign the parallelism, by setting the configuration of 
`parallelism.default`?

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779359#comment-17779359
 ] 

zhang commented on FLINK-33355:
---

[~yunta] yes

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32107] [Tests] Kubernetes test failed because ofunable to establish ssl connection to github on AZP [flink]

2023-10-24 Thread via GitHub


victor9309 commented on PR #23528:
URL: https://github.com/apache/flink/pull/23528#issuecomment-1778610263

   Thanks @XComp for the review.  
   
   I test to execute the next attempt when it fails
   
   > can you double-check that wget works properly (i.e. returns an non-zero 
exit code) if accessing the website fails? ...to make the retry logic work.
   
   
![image](https://github.com/apache/flink/assets/18453843/f5b19771-5ef0-4e78-bdd7-fee151e7ccb6)
   
   The only parameter to modify is to execute the downloaded command, which 
makes it more intuitive.
   > shouldn't we be able to generalize it even more? We could move the actual 
download logic into this function as well. The only parameter that should be 
passed would be the URL of the artifact. WDYT?
   
   
![image](https://github.com/apache/flink/assets/18453843/73b328ce-9eda-453b-bb68-d100fa490178)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779352#comment-17779352
 ] 

zhang commented on FLINK-33355:
---

[~yunta] 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
SqlServerSourceBuilder.SqlServerIncrementalSource cdcSource  = ...;
DataStreamSource cdcStream = env.fromSource(cdcSource, 
WatermarkStrategy.noWatermarks(), "cdc"); 
cdcStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.trigger(new CustomTrigger<>())
.apply(new CustomFunction()).print();
env.execute(); {code}
For the task mentioned above, if I reduce the parallelism from n to 1, will 
encounter the previously mentioned error.
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
SqlServerSourceBuilder.SqlServerIncrementalSource cdcSource = ...; 
DataStreamSource cdcStream = env.fromSource(cdcSource, 
WatermarkStrategy.noWatermarks(), "cdc"); cdcStream.print(); 
env.execute(); {code}
But doing this doesn't result in an error, so I suspect it might be caused by 
window operators.

 

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Update iteration.md | Spelling [flink-ml]

2023-10-24 Thread via GitHub


as1605 opened a new pull request, #257:
URL: https://github.com/apache/flink-ml/pull/257

   Minor spelling and grammar change
   
   
   
   ## What is the purpose of the change
   
   Fixes spelling errors in the documentation
   
   ## Brief change log
   
   - `tarnsmitted` to `transmitted`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] add announcement blog post for Flink 1.18 [flink-web]

2023-10-24 Thread via GitHub


luoyuxia commented on code in PR #680:
URL: https://github.com/apache/flink-web/pull/680#discussion_r1371212872


##
docs/content/posts/2023-10-10-release-1.18.0.md:
##
@@ -0,0 +1,572 @@
+---
+authors:
+- JingGe:
+  name: "Jing Ge"
+  twitter: jingengineer
+- KonstantinKnauf:
+  name: "Konstantin Knauf"
+  twitter: snntrable
+- SergeyNuyanzin:
+  name: "Sergey Nuyanzin"
+  twitter: uckamello
+- QingshengRen:
+  name: "Qingsheng Ren"
+  twitter: renqstuite
+date: "2023-10-10T08:00:00Z"
+subtitle: ""
+title: Announcing the Release of Apache Flink 1.18
+aliases:
+- /news/2023/10/10/release-1.18.0.html
+---
+
+The Apache Flink PMC is pleased to announce the release of Apache Flink 
1.18.0. As usual, we are looking at a packed 
+release with a wide variety of improvements and new features. Overall, 174 
people contributed to this release completing 
+18 FLIPS and 700+ issues. Thank you!
+
+Let's dive into the highlights.
+
+# Towards a Streaming Lakehouse
+
+## Flink SQL Improvements
+
+### Introduce Flink JDBC Driver For SQL Gateway 
+
+Flink 1.18 comes with a JDBC Driver for the Flink SQL Gateway. So, you can now 
use any SQL Client that supports JDBC to 
+interact with your tables via Flink SQL. Here is an example using 
[SQLLine](https://julianhyde.github.io/sqlline/manual.html). 
+
+```shell
+sqlline> !connect jdbc:flink://localhost:8083
+```
+
+```shell
+sqlline version 1.12.0
+sqlline> !connect jdbc:flink://localhost:8083
+Enter username for jdbc:flink://localhost:8083:
+Enter password for jdbc:flink://localhost:8083:
+0: jdbc:flink://localhost:8083> CREATE TABLE T(
+. . . . . . . . . . . . . . .)>  a INT,
+. . . . . . . . . . . . . . .)>  b VARCHAR(10)
+. . . . . . . . . . . . . . .)>  ) WITH (
+. . . . . . . . . . . . . . .)>  'connector' = 'filesystem',
+. . . . . . . . . . . . . . .)>  'path' = 'file:///tmp/T.csv',
+. . . . . . . . . . . . . . .)>  'format' = 'csv'
+. . . . . . . . . . . . . . .)>  );
+No rows affected (0.122 seconds)
+0: jdbc:flink://localhost:8083> INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello');
++--+
+|  job id  |
++--+
+| fbade1ab4450fc57ebd5269fdf60dcfd |
++--+
+1 row selected (1.282 seconds)
+0: jdbc:flink://localhost:8083> SELECT * FROM T;
++---+---+
+| a |   b   |
++---+---+
+| 1 | Hi|
+| 2 | Hello |
++---+---+
+2 rows selected (1.955 seconds)
+0: jdbc:flink://localhost:8083>
+```
+
+**More Information**
+* 
[Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/jdbcdriver/)
 
+* [FLIP-293: Introduce Flink Jdbc Driver For SQL 
Gateway](https://cwiki.apache.org/confluence/display/FLINK/FLIP-293%3A+Introduce+Flink+Jdbc+Driver+For+Sql+Gateway)
+
+
+### Stored Procedure Support for Flink Connectors
+
+Stored procedures have been an indispensable tool in traditional databases,
+offering a convenient way to encapsulate complex logic for data manipulation
+and administrative tasks. They also offer the potential for enhanced
+performance, since they can trigger the handling of data operations directly
+within an external database. Other popular data systems like Trino and Iceberg
+automate and simplify common maintenance tasks into small sets of procedures,
+which greatly reduces users' administrative burden.
+
+This new update primarily targets developers of Flink connectors, who can now
+predefine custom stored procedures into connectors via the Catalog interface.
+The primary benefit to users is that connector-specific tasks that previously
+may have required writing custom Flink code can now be replaced with simple
+calls that encapsulate, standardize, and potentially optimize the underlying
+operations. Users can execute procedures using the familiar `CALL` syntax, and
+discover a connector's available procedures with `SHOW PROCEDURES`. Stored
+procedures within connectors improves the extensibility of Flink's SQL and
+Table APIs, and should unlock smoother data access and management for users.
+
+**More Information**
+* 
[Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/procedures/)
+* [FLIP-311: Support Call Stored 
Procedure](https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure)
+
+### Extended DDL Support
+
+From this release onwards, Flink supports
+
+- `REPLACE TABLE AS SELECT`
+- `CREATE OR REPLACE TABLE AS SELECT`
+
+and both these commands and previously supported `CREATE TABLE AS` can now 
support atomicity provided the underlying 
+connector also supports this.
+
+Moreover, Apache Flink now supports TRUNCATE TABLE in batch execution mode. 
Same as before, the underlying connector needs 
+to implement and provide this capability
+
+And, finally, we have also implemented support for adding, dropping and 
listing partitions via
+
+- `ALTER TABLE ADD PARTITION`
+- `ALTER TABLE DROP PARTITION`
+- `SHOW P

Re: [PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


caicancai commented on PR #688:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/688#issuecomment-177853

   https://github.com/apache/flink-kubernetes-operator/assets/77189278/817ec9f0-eb8a-4ea3-bb56-99a4f3131b43";>
   Perform mvn clean install -DskipTests -Pgenerate-docs
   There are also some changes to the file, which I don't know if I need to 
upload


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


tisonkun commented on PR #688:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/688#issuecomment-1778582314

   ```
   [INFO] 

   [INFO] Reactor Summary for Flink Kubernetes: 1.7-SNAPSHOT:
   [INFO] 
   [INFO] Flink Kubernetes: .. SUCCESS [  5.258 
s]
   [INFO] Flink Kubernetes Standalone  SUCCESS [ 23.994 
s]
   [INFO] Flink Kubernetes Operator Api .. SUCCESS [ 35.975 
s]
   [INFO] Flink Autoscaler ... SUCCESS [ 19.100 
s]
   [INFO] Flink Kubernetes Operator .. SUCCESS [06:56 
min]
   [INFO] Flink Kubernetes Webhook ... SUCCESS [ 16.950 
s]
   [INFO] Flink Kubernetes Docs .. SUCCESS [ 14.774 
s]
   [INFO] Flink SQL Runner Example ... SUCCESS [  7.986 
s]
   [INFO] Flink Beam Example . SUCCESS [ 43.310 
s]
   [INFO] Flink Kubernetes Client Code Example ... SUCCESS [ 10.857 
s]
   [INFO] Flink Autoscaler Test Job .. SUCCESS [  8.115 
s]
   [INFO] 

   [INFO] BUILD SUCCESS
   [INFO] 

   [INFO] Total time:  10:03 min
   [INFO] Finished at: 2023-10-25T06:11:12Z
   [INFO] 

   Please generate the java doc via 'mvn clean install -DskipTests 
-Pgenerate-docs' again
   Error: Process completed with exit code 1.
   ```
   
   succeed but fail - why?
   
   cc @gyfora 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


caicancai commented on PR #688:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/688#issuecomment-1778580734

   I am working on this cli issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33356) The navigation bar on Flink’s official website is messed up.

2023-10-24 Thread Lijie Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779345#comment-17779345
 ] 

Lijie Wang commented on FLINK-33356:


[~Wencong Liu] Assigned to you.

> The navigation bar on Flink’s official website is messed up.
> 
>
> Key: FLINK-33356
> URL: https://issues.apache.org/jira/browse/FLINK-33356
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Junrui Li
>Assignee: Wencong Liu
>Priority: Major
> Attachments: image-2023-10-25-11-55-52-653.png, 
> image-2023-10-25-12-34-22-790.png
>
>
> The side navigation bar on the Flink official website at the following link: 
> [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed 
> up, as shown in the attached screenshot.
> !image-2023-10-25-11-55-52-653.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33356) The navigation bar on Flink’s official website is messed up.

2023-10-24 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-33356:
--

Assignee: Wencong Liu

> The navigation bar on Flink’s official website is messed up.
> 
>
> Key: FLINK-33356
> URL: https://issues.apache.org/jira/browse/FLINK-33356
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Junrui Li
>Assignee: Wencong Liu
>Priority: Major
> Attachments: image-2023-10-25-11-55-52-653.png, 
> image-2023-10-25-12-34-22-790.png
>
>
> The side navigation bar on the Flink official website at the following link: 
> [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed 
> up, as shown in the attached screenshot.
> !image-2023-10-25-11-55-52-653.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33356) The navigation bar on Flink’s official website is messed up.

2023-10-24 Thread Junrui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779341#comment-17779341
 ] 

Junrui Li commented on FLINK-33356:
---

[~Wencong Liu] Thank you for your volunteering, [~wanglijie] could you help to 
assign this ticket to Wencong?

> The navigation bar on Flink’s official website is messed up.
> 
>
> Key: FLINK-33356
> URL: https://issues.apache.org/jira/browse/FLINK-33356
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Junrui Li
>Priority: Major
> Attachments: image-2023-10-25-11-55-52-653.png, 
> image-2023-10-25-12-34-22-790.png
>
>
> The side navigation bar on the Flink official website at the following link: 
> [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed 
> up, as shown in the attached screenshot.
> !image-2023-10-25-11-55-52-653.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779340#comment-17779340
 ] 

Yun Tang commented on FLINK-33355:
--

[~edmond_j] could you please share the code to reproduce this problem?

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33356) The navigation bar on Flink’s official website is messed up.

2023-10-24 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779330#comment-17779330
 ] 

Wencong Liu edited comment on FLINK-33356 at 10/25/23 5:58 AM:
---

Hello [~JunRuiLi] , I found this case is due to the commit 
"30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5", which modified the subproject 
commit. I think we should revert this change. Could you assign to me?

!image-2023-10-25-12-34-22-790.png!


was (Author: JIRAUSER281639):
Hello [~JunRuiLi] , I found this case is due to the commit 
"30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5", which modified the subproject 
commit. I think we should revert this change

!image-2023-10-25-12-34-22-790.png!

> The navigation bar on Flink’s official website is messed up.
> 
>
> Key: FLINK-33356
> URL: https://issues.apache.org/jira/browse/FLINK-33356
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Junrui Li
>Priority: Major
> Attachments: image-2023-10-25-11-55-52-653.png, 
> image-2023-10-25-12-34-22-790.png
>
>
> The side navigation bar on the Flink official website at the following link: 
> [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed 
> up, as shown in the attached screenshot.
> !image-2023-10-25-11-55-52-653.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779336#comment-17779336
 ] 

zhang commented on FLINK-33355:
---

[~yunta]   I know that modifying the max parallelism is not possible, but I 
only changed the parallelism by using the  parameter  
{*}parallelism.default{*}. In my tests, reducing it from n to m (where m < n 
and m > 1) is feasible. However, reducing it to 1 results in the aforementioned 
error. Nevertheless, I have a requirement to reduce the parallelism of a Flink 
process from n to 1.

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30768] [Project Website] flink-web version cleanup [flink-web]

2023-10-24 Thread via GitHub


victor9309 commented on PR #683:
URL: https://github.com/apache/flink-web/pull/683#issuecomment-1778542189

   Thanks @XComp for the review. Thank you very much for your advice.
   ```
   $ cat foo-utils.sh 
   function foo() {
 echo 'foo...' >&2
 exit 1
   }
   $ cat foo-main.sh 
   source ./foo-utils.sh
   
   v=$(foo)
   echo "$v"
   echo "end-"
   ```
   
![image](https://github.com/apache/flink-web/assets/18453843/a83ad090-1e5b-4862-8ca1-c9328ebe76dc)
   
   
   I found that exit 1 in the function can not exit the script, so I modify the 
logic, please check it
   ```
   $ cat foo-utils.sh 
   export TOP_PID=$$
   trap 'exit 1' TERM
   function foo() {
  echo 'foo...' >&2
  kill -s TERM $TOP_PID
   }
   
   $ cat foo-main.sh 
   source ./foo-utils.sh
   
   v=$(foo)
   echo "$v"
   echo "end-"
   ```
   
![image](https://github.com/apache/flink-web/assets/18453843/be4816f5-19fb-40ef-9916-6543cb3ba1df)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33357) add Apache Software License 2

2023-10-24 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen reassigned FLINK-33357:
-

Assignee: 蔡灿材

> add Apache Software License 2
> -
>
> Key: FLINK-33357
> URL: https://issues.apache.org/jira/browse/FLINK-33357
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: 蔡灿材
>Assignee: 蔡灿材
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0
>
> Attachments: 2023-10-25 12-08-58屏幕截图.png
>
>
> Flinkdeployments.flink.apache.org - v1. Currently yml and 
> flinksessionjobs.flink.apache.org - v1. Yml don't
> add add Apache Software License 2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33358) Flink SQL Client fails to start in Flink on YARN

2023-10-24 Thread Prabhu Joseph (Jira)
Prabhu Joseph created FLINK-33358:
-

 Summary: Flink SQL Client fails to start in Flink on YARN
 Key: FLINK-33358
 URL: https://issues.apache.org/jira/browse/FLINK-33358
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN, Table SQL / Client
Affects Versions: 1.18.0
Reporter: Prabhu Joseph


Flink SQL Client fails to start in Flink on YARN with below error
{code:java}
flink-yarn-session -tm 2048 -s 2 -d

/usr/lib/flink/bin/sql-client.sh 

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Could not read from command line.
at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221)
at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:179)
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:121)
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:114)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118)
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.table.client.config.SqlClientOptions
at 
org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59)
at 
org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633)
at 
org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615)
at 
org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554)
at 
org.jline.reader.impl.LineReaderImpl.doCleanup(LineReaderImpl.java:2340)
at 
org.jline.reader.impl.LineReaderImpl.cleanup(LineReaderImpl.java:2332)
at 
org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:626)
at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:194)
... 7 more
{code}
The issue is due to the old jline jar from Hadoop classpath 
(/usr/lib/hadoop-yarn/lib/jline-3.9.0.jar) taking first precedence. Flink-1.18 
requires jline-3.21.0.jar.

Placing flink-sql-client.jar (bundled with jline-3.21) before the Hadoop 
classpath fixes the issue.
{code:java}
diff --git a/flink-table/flink-sql-client/bin/sql-client.sh 
b/flink-table/flink-sql-client/bin/sql-client.sh
index 24746c5dc8..4ab8635de2 100755
--- a/flink-table/flink-sql-client/bin/sql-client.sh
+++ b/flink-table/flink-sql-client/bin/sql-client.sh
@@ -89,7 +89,7 @@ if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then
 elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
 
 # start client with jar
-exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" 
-classpath "`manglePathList 
"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" 
org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath 
$FLINK_SQL_CLIENT_JAR`"
+exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" 
-classpath "`manglePathList 
"$CC_CLASSPATH:$FLINK_SQL_CLIENT_JAR:$INTERNAL_HADOOP_CLASSPATHS`" 
org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath 
$FLINK_SQL_CLIENT_JAR`"
 
 # write error message to stderr
 else
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33358) Flink SQL Client fails to start in Flink on YARN

2023-10-24 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-33358:
--
Description: 
Flink SQL Client fails to start in Flink on YARN with below error
{code:java}
flink-yarn-session -tm 2048 -s 2 -d

/usr/lib/flink/bin/sql-client.sh 

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Could not read from command line.
at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221)
at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:179)
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:121)
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:114)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118)
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.table.client.config.SqlClientOptions
at 
org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59)
at 
org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633)
at 
org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615)
at 
org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554)
at 
org.jline.reader.impl.LineReaderImpl.doCleanup(LineReaderImpl.java:2340)
at 
org.jline.reader.impl.LineReaderImpl.cleanup(LineReaderImpl.java:2332)
at 
org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:626)
at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:194)
... 7 more
{code}
The issue is due to the old jline jar from Hadoop (3.3.3) classpath 
(/usr/lib/hadoop-yarn/lib/jline-3.9.0.jar) taking first precedence. Flink-1.18 
requires jline-3.21.0.jar.

Placing flink-sql-client.jar (bundled with jline-3.21) before the Hadoop 
classpath fixes the issue.
{code:java}
diff --git a/flink-table/flink-sql-client/bin/sql-client.sh 
b/flink-table/flink-sql-client/bin/sql-client.sh
index 24746c5dc8..4ab8635de2 100755
--- a/flink-table/flink-sql-client/bin/sql-client.sh
+++ b/flink-table/flink-sql-client/bin/sql-client.sh
@@ -89,7 +89,7 @@ if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then
 elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
 
 # start client with jar
-exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" 
-classpath "`manglePathList 
"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" 
org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath 
$FLINK_SQL_CLIENT_JAR`"
+exec "$JAVA_RUN" $FLINK_ENV_JAVA_OPTS $JVM_ARGS "${log_setting[@]}" 
-classpath "`manglePathList 
"$CC_CLASSPATH:$FLINK_SQL_CLIENT_JAR:$INTERNAL_HADOOP_CLASSPATHS`" 
org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath 
$FLINK_SQL_CLIENT_JAR`"
 
 # write error message to stderr
 else
{code}

  was:
Flink SQL Client fails to start in Flink on YARN with below error
{code:java}
flink-yarn-session -tm 2048 -s 2 -d

/usr/lib/flink/bin/sql-client.sh 

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Could not read from command line.
at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221)
at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:179)
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:121)
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:114)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118)
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.table.client.config.SqlClientOptions
at 
org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59)
at 
org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633)
at 
org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615)
at 
org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554)
at 
org.jline.reader.impl.LineReaderImpl.doCleanup(LineReaderImpl.java:2340)
at 
org.jline.rea

[jira] [Commented] (FLINK-33356) The navigation bar on Flink’s official website is messed up.

2023-10-24 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779330#comment-17779330
 ] 

Wencong Liu commented on FLINK-33356:
-

Hello [~JunRuiLi] , I found this case is due to the commit 
"30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5", which modified the subproject 
commit. I think we should revert this change

!image-2023-10-25-12-34-22-790.png!

> The navigation bar on Flink’s official website is messed up.
> 
>
> Key: FLINK-33356
> URL: https://issues.apache.org/jira/browse/FLINK-33356
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Junrui Li
>Priority: Major
> Attachments: image-2023-10-25-11-55-52-653.png, 
> image-2023-10-25-12-34-22-790.png
>
>
> The side navigation bar on the Flink official website at the following link: 
> [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed 
> up, as shown in the attached screenshot.
> !image-2023-10-25-11-55-52-653.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33356) The navigation bar on Flink’s official website is messed up.

2023-10-24 Thread Wencong Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wencong Liu updated FLINK-33356:

Attachment: image-2023-10-25-12-34-22-790.png

> The navigation bar on Flink’s official website is messed up.
> 
>
> Key: FLINK-33356
> URL: https://issues.apache.org/jira/browse/FLINK-33356
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Junrui Li
>Priority: Major
> Attachments: image-2023-10-25-11-55-52-653.png, 
> image-2023-10-25-12-34-22-790.png
>
>
> The side navigation bar on the Flink official website at the following link: 
> [https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed 
> up, as shown in the attached screenshot.
> !image-2023-10-25-11-55-52-653.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33090][checkpointing] CheckpointsCleaner clean individual chec… [flink]

2023-10-24 Thread via GitHub


yigress commented on PR #23425:
URL: https://github.com/apache/flink/pull/23425#issuecomment-1778479885

   @pnowojski I rebased and it kicked off a rerun successfully. I also run some 
job for a day without problem. if looks good can you help merge it too? thank 
you so much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33357) add Apache Software License 2

2023-10-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33357:
---
Labels: pull-request-available  (was: )

> add Apache Software License 2
> -
>
> Key: FLINK-33357
> URL: https://issues.apache.org/jira/browse/FLINK-33357
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: 蔡灿材
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0
>
> Attachments: 2023-10-25 12-08-58屏幕截图.png
>
>
> Flinkdeployments.flink.apache.org - v1. Currently yml and 
> flinksessionjobs.flink.apache.org - v1. Yml don't
> add add Apache Software License 2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33357] add Apache Software License 2 [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


caicancai opened a new pull request, #688:
URL: https://github.com/apache/flink-kubernetes-operator/pull/688

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request adds a new feature to periodically create 
and maintain savepoints through the `FlinkDeployment` custom resource.)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Periodic savepoint trigger is introduced to the custom resource*
 - *The operator checks on reconciliation whether the required time has 
passed*
 - *The JobManager's dispose savepoint API is used to clean up obsolete 
savepoints*
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33357) add Apache Software License 2

2023-10-24 Thread Jira
蔡灿材 created FLINK-33357:
---

 Summary: add Apache Software License 2
 Key: FLINK-33357
 URL: https://issues.apache.org/jira/browse/FLINK-33357
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0
Reporter: 蔡灿材
 Fix For: kubernetes-operator-1.5.0
 Attachments: 2023-10-25 12-08-58屏幕截图.png

Flinkdeployments.flink.apache.org - v1. Currently yml and 
flinksessionjobs.flink.apache.org - v1. Yml don't

add add Apache Software License 2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779315#comment-17779315
 ] 

Yun Tang commented on FLINK-33355:
--

Changing the max-parallelism (instead of parallelism), would break the 
checkpoint compatibility, which is built by design. You can refer to 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism
 for more details.

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-33355.

Resolution: Information Provided

> can't reduce the parallelism from 'n' to '1' when recovering through a 
> savepoint.
> -
>
> Key: FLINK-33355
> URL: https://issues.apache.org/jira/browse/FLINK-33355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
> Environment: flink 1.17.1
>Reporter: zhang
>Priority: Major
>
> If the program includes operators with window, it is not possible to reduce 
> the parallelism of the operators from n to 1 when restarting from a 
> savepoint, and it will result in an error: 
> {code:java}
> //IllegalStateException: Failed to rollback to checkpoint/savepoint 
> Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint 
> state and new program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 
> with max parallelism 128 to new program with max parallelism 1. This 
> indicates that the program has been changed in a non-compatible way after the 
> checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33356) The navigation bar on Flink’s official website is messed up.

2023-10-24 Thread Junrui Li (Jira)
Junrui Li created FLINK-33356:
-

 Summary: The navigation bar on Flink’s official website is messed 
up.
 Key: FLINK-33356
 URL: https://issues.apache.org/jira/browse/FLINK-33356
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Junrui Li
 Attachments: image-2023-10-25-11-55-52-653.png

The side navigation bar on the Flink official website at the following link: 
[https://nightlies.apache.org/flink/flink-docs-master/] appears to be messed 
up, as shown in the attached screenshot.

!image-2023-10-25-11-55-52-653.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33316][runtime] Avoid unnecessary heavy getStreamOperatorFactory [flink]

2023-10-24 Thread via GitHub


1996fanrui commented on code in PR #23550:
URL: https://github.com/apache/flink/pull/23550#discussion_r1371104358


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##
@@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator operator) {
 
 public void setStreamOperatorFactory(StreamOperatorFactory factory) {
 if (factory != null) {
-toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+config.setString(SERIALIZED_UDF_CLASS_NAME, 
factory.getClass().getName());

Review Comment:
   Hi @pnowojski , thanks for your analysis!
   
   > I would move: 
SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)
   > check, into the boolean 
StreamConfig#isSinkWriterOperatorFactory(Class<...> ...) method. 
   > It doesn't fit there very well, BUT at least it would justify why we have 
the checkState in the 
   > StreamConfig#setStreamOperatorFactory.
   
   The solution1 
(`SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)`
 ) is still fragile, right?
   
   When `makes SinkWriterOperatorFactory non final and implement a subclass` in 
the future, it still cannot support, and silently. 
   
   > toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
   config.setString(IS_INSTANCE_OF_SinkWriterOperatorFactory, factory instance 
of SinkWriterOperatorFactory);
   > would be better/cleaner. Either one is fine for me.
   
   The solution2 is perfectly compatible with the case of adding sub class.
   
   However, as I said before, `the getStreamOperatorFactory is called in the 
toString to print the class name.`, and I'd like to using the 
`SERIALIZED_UDF_CLASS_NAME` instead of `getStreamOperatorFactory`.
   
   If we just keep `IS_INSTANCE_OF_SinkWriterOperatorFactory`, we must call 
`getStreamOperatorFactory` in the `toString` method.
   
   Or we add the `IS_INSTANCE_OF_SinkWriterOperatorFactory` and 
`SERIALIZED_UDF_CLASS_NAME` together?
   
   
   Actually, I have solution3 before I create this PR:
   
   We store the `SERIALIZED_UDF_CLASS` instead of `SERIALIZED_UDF_CLASS_NAME`.
   
   ```
   # setStreamOperatorFactory method
   toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
   toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, factory.getClass());
   ```
   
   
   ```
   public > Class 
getStreamOperatorFactoryClass(ClassLoader cl) {
   try {
   return InstantiationUtil.readObjectFromConfig(this.config, 
SERIALIZED_UDF_CLASS, cl);
   } catch (Exception e) {
   throw new StreamTaskException("Could not instantiate chained 
outputs.", e);
   }
   }
   ```
   
   And check `isAssignableFrom`:
   
   ```
   
SinkWriterOperatorFactory.class.isAssignableFrom(getStreamOperatorFactoryClass(SinkWriterOperatorFactory.class.getClassLoader()));
   ```
   
   The solution3 is fine, however, I'm worried that when there are multiple 
classloaders, the judgment may be wrong.
   
   That's why this PR store the ClassName instead of Class.
   
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] The io.disk package of flink-runtime module [flink]

2023-10-24 Thread via GitHub


Jiabao-Sun commented on PR #23572:
URL: https://github.com/apache/flink/pull/23572#issuecomment-1778455547

   Hi @RocMarshal, please help review it when you have time.
   Thanks :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] Module: The io package of flink-runtime [flink]

2023-10-24 Thread via GitHub


RocMarshal commented on PR #23200:
URL: https://github.com/apache/flink/pull/23200#issuecomment-1778448738

   > This PR is too huge to review. I will split it into multiple PRs.
   
   SGTM +1.
   Looking forward to it~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33355) can't reduce the parallelism from 'n' to '1' when recovering through a savepoint.

2023-10-24 Thread zhang (Jira)
zhang created FLINK-33355:
-

 Summary: can't reduce the parallelism from 'n' to '1' when 
recovering through a savepoint.
 Key: FLINK-33355
 URL: https://issues.apache.org/jira/browse/FLINK-33355
 Project: Flink
  Issue Type: Bug
  Components: API / Core
 Environment: flink 1.17.1
Reporter: zhang


If the program includes operators with window, it is not possible to reduce the 
parallelism of the operators from n to 1 when restarting from a savepoint, and 
it will result in an error: 
{code:java}
//IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint 
Metadata. Max parallelism mismatch between checkpoint/savepoint state and new 
program. Cannot map operator 0e059b9f403cf6f35592ab773c9408d4 with max 
parallelism 128 to new program with max parallelism 1. This indicates that the 
program has been changed in a non-compatible way after the 
checkpoint/savepoint. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33354) Reuse the TaskInformation for multiple slots

2023-10-24 Thread Rui Fan (Jira)
Rui Fan created FLINK-33354:
---

 Summary: Reuse the TaskInformation for multiple slots
 Key: FLINK-33354
 URL: https://issues.apache.org/jira/browse/FLINK-33354
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Affects Versions: 1.17.1, 1.18.0
Reporter: Rui Fan
Assignee: Rui Fan


The background is similar to FLINK-33315.

A hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. 
When slotPerTM = 4, one TM will run 4 HiveSources at the same time.

 

How the TaskExecutor to submit a large task?
 # TaskExecutor#loadBigData will read all bytes from file to 
SerializedValue 
 ** The SerializedValue  has a byte[]
 ** It will cost the heap memory
 ** It will be great than 281 MB, because it not only stores 
HiveSource#partitionBytes, it also stores other information of TaskInformation.
 # Generate the TaskInformation from SerializedValue 
 ** TaskExecutor#submitTask calls the 
tdd.getSerializedTaskInformation()..deserializeValue()
 ** tdd.getSerializedTaskInformation() is SerializedValue 
 ** It will generate the TaskInformation
 ** TaskInformation includes the Configuration 
{color:#9876aa}taskConfiguration{color}
 ** The {color:#9876aa}taskConfiguration{color} includes 
StreamConfig#{color:#9876aa}SERIALIZEDUDF{color}

 

{color:#172b4d}Based on the above process, TM memory will have 2 big byte array 
for each task:{color}
 * {color:#172b4d}The SerializedValue{color}
 * {color:#172b4d}The TaskInformation{color}

When one TM runs 4 HiveSources at the same time, it will have 8 big byte array.

In our production environment, this is also a situation that often leads to TM 
OOM.
h2. Solution:

These data is totally same due to the PermanentBlobKey is same. We can add a 
cache for it to reduce the memory and cpu cost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner

2023-10-24 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779300#comment-17779300
 ] 

Xin Chen edited comment on FLINK-26603 at 10/25/23 2:58 AM:


[~luoyuxia] Ok,thank you very much. :D(y). I will test all with 
flink-table-planner.


was (Author: JIRAUSER298666):
[~luoyuxia] Ok,thank you very much. :D(y)

> [Umbrella] Decouple Hive with Flink planner
> ---
>
> Key: FLINK-26603
> URL: https://issues.apache.org/jira/browse/FLINK-26603
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Table SQL / Planner
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
> Fix For: 1.18.0
>
>
> To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152.
> But it also brings much maintenance burden and complexity for it mixes some 
> logic specific to Hive with Flink planner. We should remove such logic from 
> Flink planner and make it totally decouple with Flink planner.
> With this ticket, we expect:
> 1:  there won't be any specific logic to Hive in planner module
> 2:  remove  flink-sql-parser-hive from flink-table module 
> 3:  remove the planner dependency in flink-connector-hive
> I'll update more details after investigation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner

2023-10-24 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779300#comment-17779300
 ] 

Xin Chen commented on FLINK-26603:
--

[~luoyuxia] Ok,thank you very much. :D(y)

> [Umbrella] Decouple Hive with Flink planner
> ---
>
> Key: FLINK-26603
> URL: https://issues.apache.org/jira/browse/FLINK-26603
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Table SQL / Planner
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
> Fix For: 1.18.0
>
>
> To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152.
> But it also brings much maintenance burden and complexity for it mixes some 
> logic specific to Hive with Flink planner. We should remove such logic from 
> Flink planner and make it totally decouple with Flink planner.
> With this ticket, we expect:
> 1:  there won't be any specific logic to Hive in planner module
> 2:  remove  flink-sql-parser-hive from flink-table module 
> 3:  remove the planner dependency in flink-connector-hive
> I'll update more details after investigation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner

2023-10-24 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779299#comment-17779299
 ] 

luoyuxia commented on FLINK-26603:
--

[~xinchen147] Yes, you're right. Nothing special to table-planner-loader, it's 
just used to hide the table-planner(which depends on scala) in another 
classloader. There shouldn't be any problem even though you are not going to 
use hive dialect.

> [Umbrella] Decouple Hive with Flink planner
> ---
>
> Key: FLINK-26603
> URL: https://issues.apache.org/jira/browse/FLINK-26603
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Table SQL / Planner
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
> Fix For: 1.18.0
>
>
> To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152.
> But it also brings much maintenance burden and complexity for it mixes some 
> logic specific to Hive with Flink planner. We should remove such logic from 
> Flink planner and make it totally decouple with Flink planner.
> With this ticket, we expect:
> 1:  there won't be any specific logic to Hive in planner module
> 2:  remove  flink-sql-parser-hive from flink-table module 
> 3:  remove the planner dependency in flink-connector-hive
> I'll update more details after investigation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] Module: The io package of flink-runtime [flink]

2023-10-24 Thread via GitHub


Jiabao-Sun closed pull request #23200: [FLINK-32850][flink-runtime][JUnit5 
Migration] Module: The io package of flink-runtime
URL: https://github.com/apache/flink/pull/23200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33315) Optimize memory usage of large StreamOperator

2023-10-24 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33315:

Component/s: Runtime / Task

> Optimize memory usage of large StreamOperator
> -
>
> Key: FLINK-33315
> URL: https://issues.apache.org/jira/browse/FLINK-33315
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration, Runtime / Task
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: 
> 130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b.png,
>  image-2023-10-19-16-28-16-077.png
>
>
> Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM 
> always fail with java.lang.OutOfMemoryError: Java heap space.
>  
> Here is a example: a hive table with a lot of data, and the 
> HiveSource#partitionBytes is 281MB.
> After analysis, the root cause is that TM maintains the big object with 3 
> replicas:
>  * Replica_1: SourceOperatorFactory (it's necessary for running task)
>  * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
>  ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code 
> link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
>  ** When creating a successor operator to a SourceOperator, the call stack is:
>  *** OperatorChain#createOperatorChain ->
>  *** wrapOperatorIntoOutput ->
>  *** getOperatorRecordsOutCounter ->
>  *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
>  ** It will generate the SourceOperatorFactory temporarily and just check 
> whether it's SinkWriterOperatorFactory
>  * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
>  ** It is used to generate SourceOperatorFactory.
>  ** Now the value is always maintained in heap memory.
>  ** However, after generating we can release it or store it in the disk if 
> needed.
>  *** We can define a threshold, when the value size is less than threshold, 
> the release strategy doesn't take effect.
>  ** If so, we can save a lot of heap memory.
> These three replicas use about 800MB of memory. Please note that this is just 
> a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same 
> time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB.
> These large objects in the JVM cannot be recycled, causing TM to frequently 
> OOM.
> This JIRA focus on optimizing Replica_2 and Replica_3.
>  
> !image-2023-10-19-16-28-16-077.png!
>  
> !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32850][flink-runtime][JUnit5 Migration] Module: The io package of flink-runtime [flink]

2023-10-24 Thread via GitHub


Jiabao-Sun commented on PR #23200:
URL: https://github.com/apache/flink/pull/23200#issuecomment-1778414947

   This PR is too huge to review.
   I will split it into multiple PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26603) [Umbrella] Decouple Hive with Flink planner

2023-10-24 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779298#comment-17779298
 ] 

Xin Chen commented on FLINK-26603:
--

Hi,  [~luoyuxia] Thank you. Yes, as the table-planner-loader module was 
introduced to decouple scala, I have no specific requirements for the scala 
version, so I am considering fully using the table-planner to replace 
table-planner-loader for all scenarios which include scenes that do not involve 
the hive dialect. Please allow me to confirm again: What I am concerned about 
is the impact or risk of completely using this module on other functions or 
performance when it not involves the hive dialect. I think it's also completely 
okay, after all, the table-planner-loader module actually uses the 
table-planner.  The only difference may be that the classloader mechanism of 
the loader module is no longer used? Is there any problem with this?I think 
it's probably not

> [Umbrella] Decouple Hive with Flink planner
> ---
>
> Key: FLINK-26603
> URL: https://issues.apache.org/jira/browse/FLINK-26603
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Table SQL / Planner
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
> Fix For: 1.18.0
>
>
> To support Hive dialect with Flink, we have implemented FLIP-123, FLIP-152.
> But it also brings much maintenance burden and complexity for it mixes some 
> logic specific to Hive with Flink planner. We should remove such logic from 
> Flink planner and make it totally decouple with Flink planner.
> With this ticket, we expect:
> 1:  there won't be any specific logic to Hive in planner module
> 2:  remove  flink-sql-parser-hive from flink-table module 
> 3:  remove the planner dependency in flink-connector-hive
> I'll update more details after investigation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33315) Optimize memory usage of large StreamOperator

2023-10-24 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33315:

Attachment: 
130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b.png

> Optimize memory usage of large StreamOperator
> -
>
> Key: FLINK-33315
> URL: https://issues.apache.org/jira/browse/FLINK-33315
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: 
> 130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b.png,
>  image-2023-10-19-16-28-16-077.png
>
>
> Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM 
> always fail with java.lang.OutOfMemoryError: Java heap space.
>  
> Here is a example: a hive table with a lot of data, and the 
> HiveSource#partitionBytes is 281MB.
> After analysis, the root cause is that TM maintains the big object with 3 
> replicas:
>  * Replica_1: SourceOperatorFactory (it's necessary for running task)
>  * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
>  ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code 
> link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
>  ** When creating a successor operator to a SourceOperator, the call stack is:
>  *** OperatorChain#createOperatorChain ->
>  *** wrapOperatorIntoOutput ->
>  *** getOperatorRecordsOutCounter ->
>  *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
>  ** It will generate the SourceOperatorFactory temporarily and just check 
> whether it's SinkWriterOperatorFactory
>  * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
>  ** It is used to generate SourceOperatorFactory.
>  ** Now the value is always maintained in heap memory.
>  ** However, after generating we can release it or store it in the disk if 
> needed.
>  *** We can define a threshold, when the value size is less than threshold, 
> the release strategy doesn't take effect.
>  ** If so, we can save a lot of heap memory.
> These three replicas use about 800MB of memory. Please note that this is just 
> a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same 
> time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB.
> These large objects in the JVM cannot be recycled, causing TM to frequently 
> OOM.
> This JIRA focus on optimizing Replica_2 and Replica_3.
>  
> !image-2023-10-19-16-28-16-077.png!
>  
> !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33315) Optimize memory usage of large StreamOperator

2023-10-24 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33315:

Description: 
Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM 
always fail with java.lang.OutOfMemoryError: Java heap space.

 

Here is a example: a hive table with a lot of data, and the 
HiveSource#partitionBytes is 281MB.

After analysis, the root cause is that TM maintains the big object with 3 
replicas:
 * Replica_1: SourceOperatorFactory (it's necessary for running task)
 * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
 ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code 
link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
 ** When creating a successor operator to a SourceOperator, the call stack is:
 *** OperatorChain#createOperatorChain ->
 *** wrapOperatorIntoOutput ->
 *** getOperatorRecordsOutCounter ->
 *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
 ** It will generate the SourceOperatorFactory temporarily and just check 
whether it's SinkWriterOperatorFactory
 * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
 ** It is used to generate SourceOperatorFactory.
 ** Now the value is always maintained in heap memory.
 ** However, after generating we can release it or store it in the disk if 
needed.
 *** We can define a threshold, when the value size is less than threshold, the 
release strategy doesn't take effect.
 ** If so, we can save a lot of heap memory.

These three replicas use about 800MB of memory. Please note that this is just a 
subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same time, 
so 12 replicas are maintained in the TM memory, it's about 3.3 GB.

These large objects in the JVM cannot be recycled, causing TM to frequently OOM.

This JIRA focus on optimizing Replica_2 and Replica_3.

 

!image-2023-10-19-16-28-16-077.png!

 

!https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b010220e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93!

 

 

  was:
Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM 
always fail with java.lang.OutOfMemoryError: Java heap space.

 

Here is a example: a hive table with a lot of data, and the 
HiveSource#partitionBytes is 281MB.

After analysis, the root cause is that TM maintains the big object with 3 
replicas:
 * Replica_1: SourceOperatorFactory (it's necessary for running task)
 * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
 ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code 
link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
 ** When creating a successor operator to a SourceOperator, the call stack is:
 *** OperatorChain#createOperatorChain ->
 *** wrapOperatorIntoOutput ->
 *** getOperatorRecordsOutCounter ->
 *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
 ** It will generate the SourceOperatorFactory temporarily and just check 
whether it's SinkWriterOperatorFactory
 * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
 ** It is used to generate SourceOperatorFactory.
 ** Now the value is always maintained in heap memory.
 ** However, after generating we can release it or store it in the disk if 
needed.
 *** We can define a threshold, when the value size is less than threshold, the 
release strategy doesn't take effect.
 ** If so, we can save a lot of heap memory.

These three replicas use about 800MB of memory. Please note that this is just a 
subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same time, 
so 12 replicas are maintained in the TM memory, it's about 3.3 GB.

These large objects in the JVM cannot be recycled, causing TM to frequently OOM.

This JIRA focus on optimizing Replica_2 and Replica_3.

 

!image-2023-10-19-16-28-16-077.png!

 

!https://f.haiserve.com/download/5366d5f07c07a00116b148c6fa1ebff00b01021cc3da0438a0860702016976849360726a?userid=146850&token=d4a7e7d617dc71ea28bf02977333e1a8|width=1935,height=1127!

 

 


> Optimize memory usage of large StreamOperator
> -
>
> Key: FLINK-33315
> URL: https://issues.apache.org/jira/browse/FLINK-33315
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: image-2023-10-19-16-28-16-077.png
>
>
> Some of our batch jobs are upgr

Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]

2023-10-24 Thread via GitHub


afedulov commented on PR #23395:
URL: https://github.com/apache/flink/pull/23395#issuecomment-1778150546

   > That being said, @afedulov, do you think it's worthwhile bringing up the 
new feature on the mailing list to discuss?
   
   This was my initial thought, yes. Ideally we do not want to introduce 
functionality for very niche use cases, but this one makes sense to me, 
especially for building demos etc. Although this change, in my opinion, does 
not deserve a FLIP, I think it still makes sense to do a quick vote in the dev 
mailing list. The idea would be to prepend the topic with [VOTE], briefly 
describe the proposal, why it is useful and the downsides of it not being the 
best practice (Ryan's concerns). If no one comments - this is a silent yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1778137755

   @zentol thanks a lot for the review!! 
   I addressed all comments from your first pass, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370875073


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+ByteArrayInputSt

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java:
##
@@ -27,7 +27,10 @@
  * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
  * cases where the output type is specified by the returns method and, thus, 
after the stream
  * operator has been created.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead
  */
+@Deprecated

Review Comment:
   Nice trick :) Done 
https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java:
##
@@ -27,7 +27,10 @@
  * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
  * cases where the output type is specified by the returns method and, thus, 
after the stream
  * operator has been created.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead
  */
+@Deprecated

Review Comment:
   Nice trick :) Done [`e0f2041` 
(#23553)](https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##
@@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() {
 int maxParallelism = 42;
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-DataStream input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
-DataStream input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
+DataStream input1 = env.fromSequence(1, 
4).setMaxParallelism(128);
+DataStream input2 = env.fromSequence(1, 
4).setMaxParallelism(129);

Review Comment:
   Yes, this fixes test failures that arise because of this: 
https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307
   the SingleOutputStreamOperator caps max parallelism to 1. 
   The current implementation of `fromSequence`, somewhat inconsistently, 
allows parallel execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]

2023-10-24 Thread via GitHub


snuyanzin commented on PR #23519:
URL: https://github.com/apache/flink/pull/23519#issuecomment-1778108982

   thanks for the contribution
   in general it looks ok from my side
   i left a couple of minor comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]

2023-10-24 Thread via GitHub


snuyanzin commented on code in PR #23519:
URL: https://github.com/apache/flink/pull/23519#discussion_r1370858120


##
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java:
##
@@ -66,12 +70,17 @@
  *   
  * 
  *
- * Once Flink applies same logic for both table api and sql, this class should 
be removed.
+ * Once Flink applies same logic for both table api and sql, this first 
changes should be
+ * removed.
+ *
+ * 2. It uses PEEK_FIELDS_NO_EXPAND with a nested struct type (Flink 
[[RowType]]).

Review Comment:
   ```suggestion
* 2. It uses {@link PEEK_FIELDS_NO_EXPAND} with a nested struct type 
(Flink [[{@link RowType}]]).
   ```
   i wonder whether usage of links could improve navigation in documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]

2023-10-24 Thread via GitHub


snuyanzin commented on code in PR #23519:
URL: https://github.com/apache/flink/pull/23519#discussion_r1370857278


##
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java:
##
@@ -66,12 +70,17 @@
  *   
  * 
  *
- * Once Flink applies same logic for both table api and sql, this class should 
be removed.
+ * Once Flink applies same logic for both table api and sql, this first 
changes should be
+ * removed.
+ *
+ * 2. It uses PEEK_FIELDS_NO_EXPAND with a nested struct type (Flink 
[[RowType]]).
+ *
+ * See more at {@code LogicalRelDataTypeConverter} and {@code 
FlinkTypeFactory}.

Review Comment:
   ```suggestion
* See more at {@link LogicalRelDataTypeConverter} and {@link 
FlinkTypeFactory}.
   ```
   i guess it's better to use `@link` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.

Review Comment:
   >Theoretically we could also fix this (in a follow-up!) but it doesn't seem 
worth the overhead given the number of elements. Oh I think this already works; 
see below comment.
   
   Good point, I mainly decided to restrict it because this is what the current 
underlying SourceFunction was delivering (FromElementsFunction). I am not sure 
if there were any specific additional considerations that required to limit it 
to the parallelism of 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998


##
flink-tests/pom.xml:
##
@@ -284,6 +284,13 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-avro

Review Comment:
   > I cant really tell how this (and some of the flink-tests changes) related 
to fromElements.
   
   That's actually an easy one. You might remember this - 
https://issues.apache.org/jira/browse/FLINK-21386 . I added the test to 
explicitly verify that the issue you ran into with Avro and `fromElements` is 
handled correctly after the addition of the `OutputTypeConfigurable` 
functionality to `DataGeneratorSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998


##
flink-tests/pom.xml:
##
@@ -284,6 +284,13 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-avro

Review Comment:
   > I cant really tell how this (and some of the flink-tests changes) related 
to fromElements.
   
   That's actually an easy one. You might remember 
thttps://issues.apache.org/jira/browse/FLINK-21386 . I added the test to 
explicitly verify that the issue you ran into with Avro and `fromElements` is 
handled correctly after the addition of the `OutputTypeConfigurable` 
functionality to `DataGeneratorSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   The FLIP-27 source gets chained, while the test requires a non chainable 
stream (compare with `createChainableStream`).  In this PR I dealt with it by 
switching to fromCollection which is still based on the `SourceFunction`. In 
the follow-up PR for the `fromCollection` migration I had to add a legacy 
source:
   
https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119
   Not sure if we can get a `nonChainableStream` with a FLIP-27 source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   The FLIP-27 source gets chained, while the test requires a non chainable 
stream (compare with `createChainableStream`).  In this PR I dealt with it by 
switching to fromCollection that is still based on the `SourceFunction`. In the 
follow-up PR for the `fromCollection` migration I had to add a legacy source:
   
https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119
   Not sure if we can get a `nonChainableStream` with a FLIP-27 source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370832543


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism(
 configuration.set(
 
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
 chainingOfOperatorsWithDifferentMaxParallelismEnabled);
-configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+configuration.set(PipelineOptions.MAX_PARALLELISM, 1);
 try (StreamExecutionEnvironment chainEnv =
 StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
 chainEnv.fromElements(1)
 .map(x -> x)
 // should automatically break chain here
 .map(x -> x)
-.setMaxParallelism(1)
+.setMaxParallelism(10)

Review Comment:
   The verifies that the chain gets broken. The legacy source was not enforcing 
max parallelism set to 1, something that we do now by propagating the call to 
super 
(https://github.com/afedulov/flink/blob/cf1a29d47a5bb4fb92e98a36934e525d74bae17b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L208).
 Notice that the   default max parallelism in the config also got changed above 
from 10 to 1. So now we start with the source with max parallelism of 1 and 
break the chain because the second map has parallelism of 10. Previously it was 
doing the same, but in "reverse".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370824409


##
flink-formats/flink-parquet/src/test/resources/avro/user.avsc:
##
@@ -1,9 +0,0 @@
-{
-  "namespace": "org.apache.flink.connector.datagen.source.generated",
-  "type": "record",
-  "name": "User",

Review Comment:
   I added it accidentally to the wrong package in one of the earlier commits 
and this [tmp] commit just cleans it up. I got rid of both changes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33121] Failed precondition in JobExceptionsHandler due to concurrent global failures [flink]

2023-10-24 Thread via GitHub


pgaref commented on PR #23440:
URL: https://github.com/apache/flink/pull/23440#issuecomment-1778042754

   Hey @dmvk  -- thanks for the comments!
   
   1. Was debating about that, however testing e2e would require to add the 
mocked AdaptiveScheduler dependency (like `ExceptionHistoryTester`) to 
JobExceptionsHandlerTest -- that even though is possible, would add more 
complexity than value IMO as we just need to ensure the order of entries for 
the conversion is correct (similar to what we do in the AdaptiveSchedulerTest 
in the PR)
   2. Regarding `testExceptionHistoryWithTaskConcurrentGlobalFailure`, two 
concurrent Global failures could never occur with the current logic in the 
AdaptiveScheduler -- so I modified the test to represent reality and include 
our case too
   3. This issue is not related to failure enrichment at all -- it is just not 
too common as there should be one or more concurrent Task/Local failures 
followed by a Global one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]

2023-10-24 Thread via GitHub


dalelane commented on PR #23395:
URL: https://github.com/apache/flink/pull/23395#issuecomment-1778020433

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]

2023-10-24 Thread via GitHub


dalelane commented on PR #23395:
URL: https://github.com/apache/flink/pull/23395#issuecomment-1777990170

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370765358


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1161,9 +1163,10 @@ void 
testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
  */
 @Test
 void testYieldingOperatorProperlyChainedOnLegacySources() {
+// TODO: this test can be removed when the legacy SourceFunction API 
gets removed
 StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
 
-chainEnv.fromElements(1)
+chainEnv.addSource(new LegacySource())

Review Comment:
   Seems so. Has something to do with threading:
   ```
   [FLINK-16219][runtime] Disallow chaining of legacy source and yielding 
operator.
   
   This change allows yielding operators to be eagerly chained whenever 
possible, except after legacy sources.
   Yielding operators do not properly work when processInput is called from 
another thread, but are usually fine in any other chain.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+ByteArrayInputSt

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+ByteArrayInputSt

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757864


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+ByteArrayInputSt

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.

Review Comment:
   >Theoretically we could also fix this (in a follow-up!) but it doesn't seem 
worth the overhead given the number of elements. Oh I think this already works; 
see below comment.
   
   Good point, I mainly decided to restrict it because this is what the current 
underlying SourceFunction was delivering (FromElementsFunction). I am not sure 
what was the intention to limit it to the parallelism of 1.



##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import ja

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##
@@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() {
 int maxParallelism = 42;
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-DataStream input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
-DataStream input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
+DataStream input1 = env.fromSequence(1, 
4).setMaxParallelism(128);
+DataStream input2 = env.fromSequence(1, 
4).setMaxParallelism(129);

Review Comment:
   Yes, this fixes test failures that arise because of this: 
https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307
   the SingleOutputStreamOperator caps max parallelism to 1. 
   Since this PR is already pretty sizable, it seemed appropriate to postpone 
dealing with this when we work on `fromSequence`.



##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5:
##
@@ -19,8 +19,8 @@ Method 

 calls method 

 in (RecreateOnResetOperatorCoordinator.java:361)
 Method 
 calls method 
 in (TaskManagerConfiguration.java:244)
 Method 
 calls method 

 in (TaskManagerConfiguration.java:246)
-Method 
 calls method 
 in (TaskManagerServices.java:433)
-Method 
 calls method 

 in (TaskManagerServices.java:431)

Review Comment:
   Indeed. I had to enable refreeze to add missing datagen source violations, 
but how exactly it is supposed to work it archunit is still a bit of a mystery 
to me to be honest. 



##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -64,6 +64,19 @@ Constructor 
(int, 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 org.apache.flink.connector.base.source.reader.splitreader.SplitReader, 
java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, 
boolean)> calls method 
 in 
(SplitFetcher.java:97)
 Constructor 
(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 java.util.function.Supplier, org.apache.flink.configuration.Configuration, 
java.util.function.Consumer)> is annotated with 
 in (SplitFetcherManager.java:0)
 Constructor 
(int)>
 calls method  in (FutureCompletingBlockingQueue.java:114)
+Constructor 
(org.apache.flink.api.common.typeutils.TypeSerializer,
 java.lang.Iterable)> calls method 
 in 
(FromElementsGeneratorFunction.java:85)

Review Comment:
   That's what I thought too. As also tracked in the wiki, the DataGen will not 
be externalized:
   
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32986][test] Fix createTemporaryFunction type inference error [flink]

2023-10-24 Thread via GitHub


jeyhunkarimov commented on PR #23586:
URL: https://github.com/apache/flink/pull/23586#issuecomment-1777952204

   Hi @snuyanzin 
   Could you please review the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33321][hotfix] VertexFlameGraphFactoryTest#verifyRecursively doesn't work on java 21 [flink]

2023-10-24 Thread via GitHub


snuyanzin merged PR #23583:
URL: https://github.com/apache/flink/pull/23583


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32896][Runtime/Coordination] Incorrect `Map.computeIfAbsent(..., ...::new)` usage which misinterprets key as initial capacity [flink]

2023-10-24 Thread via GitHub


tzy-0x7cf commented on PR #23518:
URL: https://github.com/apache/flink/pull/23518#issuecomment-1777866572

   
   
   
   
   > Could squash the commits and rebase the branch to most-recent `master`? We 
don't want have merge commits cluttering the git history.
   > 
   > The Flink CI bot is known to have issues with force-pushes. You can try to 
work around it by pushing an empty commit after you've reorganized and rebased 
the branch (in a separate push to be on the save side).
   
   Thanks Matthias! I'm new to flink and would like to contribute more , so if 
there's anything about test or simple issues, please feel free to assign them 
to me!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]

2023-10-24 Thread via GitHub


dalelane commented on PR #23395:
URL: https://github.com/apache/flink/pull/23395#issuecomment-1777847541

   Thanks for the reviews - much appreciated 👍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]

2023-10-24 Thread via GitHub


dalelane commented on code in PR #23395:
URL: https://github.com/apache/flink/pull/23395#discussion_r1370689647


##
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java:
##
@@ -222,12 +234,14 @@ void testSerializeDeserializeBasedOnNestedSchema() throws 
Exception {
 AvroRowDataSerializationSchema serializationSchema =
 new AvroRowDataSerializationSchema(
 rowType,
-
AvroSerializationSchema.forGeneric(nullableOuterSchema),
+AvroSerializationSchema.forGeneric(

Review Comment:
   good point - updated the test in the same way with ParameterizedTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33058][formats] Add encoding option to Avro format [flink]

2023-10-24 Thread via GitHub


dalelane commented on code in PR #23395:
URL: https://github.com/apache/flink/pull/23395#discussion_r1370649857


##
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java:
##
@@ -37,46 +38,56 @@ class AvroDeserializationSchemaTest {
 
 private static final Address address = 
TestDataGenerator.generateRandomAddress(new Random());
 
+private static final AvroEncoding[] ENCODINGS = {AvroEncoding.BINARY, 
AvroEncoding.JSON};
+
 @Test
 void testNullRecord() throws Exception {

Review Comment:
   Nice - that's a new one on me, thanks. That's much better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data [flink-connector-kafka]

2023-10-24 Thread via GitHub


tzulitai closed pull request #52: [FLINK-28303] Support 
LatestOffsetsInitializer to avoid latest-offset strategy lose data
URL: https://github.com/apache/flink-connector-kafka/pull/52


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33187) Don't record duplicate event if no change

2023-10-24 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779195#comment-17779195
 ] 

Gyula Fora commented on FLINK-33187:


merged to main faaff564e1bb3d8ca51c939d34dd416585a3de74

> Don't record duplicate event if no change
> -
>
> Key: FLINK-33187
> URL: https://issues.apache.org/jira/browse/FLINK-33187
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.1
>Reporter: Clara Xiong
>Assignee: Clara Xiong
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> Problem:
> Some events are recorded repeatedly such as ScalingReport when autoscaling is 
> not enable,  which consists 99% of all events in our prod env. This wastes 
> resources and causes performance  downstream.
> Proposal:
> Suppress duplicate event within an interval defined by a new operator config 
> "scaling.report.interval" in second, defaulted to 1800.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


gyfora merged PR #685:
URL: https://github.com/apache/flink-kubernetes-operator/pull/685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


clarax commented on code in PR #685:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370455045


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -201,8 +201,8 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
-public static final ConfigOption SCALING_REPORT_INTERVAL =
-autoScalerConfig("scaling.report.interval")
+public static final ConfigOption SCALING_EVENT_INTERVAL =
+autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofSeconds(1800))
 .withDescription("Time interval to resend the identical 
event");

Review Comment:
   > As this comment[1] mentioned: `all config keys to work with the "old" 
syntax at least in the 1.7.0 release.`, so please update it, thanks!
   > 
   > Sorry for that, I didn't support the old key during decoupling autoscaler 
and kubernetes-operator.
   > 
   > [#686 
(comment)](https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1369777698)
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process

2023-10-24 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779150#comment-17779150
 ] 

Jing Ge edited comment on FLINK-33301 at 10/24/23 3:45 PM:
---

The value(my intention) was to fail faster before maven is called, like I 
mentioned, the entry point is script not maven. IMHO, anything we built to 
improve the efficiency will need to be maintained. That is the cost. The key 
point is how often the built thing will be used vs. how often it need to be 
maintained. Afaik, the versions of Java or maven might need to be 
changed/maintained once every 2 years or even longer. 

 

But, I got your point. Your thoughts are rational too. Let's stay with your 
solution and we could still come back after we got more feeling/feedback about 
it. WDYT?


was (Author: jingge):
The value(my intention) was to fail faster before maven is called, like I 
mentioned the entry point is script not maven. IMHO, anything we built to 
improve the efficiency will need to be maintained. That is the cost. The key 
point is how often the built thing will be used vs. how often it need to be 
maintained. Afaik, the versions of Java or maven might need to be 
changed/maintained once every 2 years or even longer. 

 

But, I got your point. Your thoughts are rational too. Let's stay with your 
solution and we could still come back after we got more feeling/feedback about 
it. WDYT?

> Add Java and Maven version checks in the bash script of Flink release process
> -
>
> Key: FLINK-33301
> URL: https://issues.apache.org/jira/browse/FLINK-33301
> Project: Flink
>  Issue Type: Bug
>  Components: Release System
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  Labels: pull-request-available
>
> During the release, Flink requires specific version of Java and Maven[1]. It 
> makes sense to check those versions at the very beginning of some bash 
> scripts to let it fail fast and therefore improve the efficiency.
>  
> [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31863) Add 'Hostname' enum val to k8s NodeAddress type.

2023-10-24 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31863:
--
Fix Version/s: (was: 1.17.2)

> Add 'Hostname' enum val to k8s NodeAddress type.
> 
>
> Key: FLINK-31863
> URL: https://issues.apache.org/jira/browse/FLINK-31863
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.17.0
> Environment: OS: CentOS 7
> Kubernetes: v1.18.5
>Reporter: Vince.Feng
>Priority: Major
>  Labels: easyfix, kubernetes, pull-request-available
> Attachments: image-2023-04-20-17-53-30-969.png
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Class io.fabric8.kubernetes.api.model.NodeAddress.type contains 
> 'InternalIP','ExternalIP' and 'Hostname'. The InternalIP address is 
> unavailable in the private cloud environment. But the hostname can be 
> resolved by the DNS server. So 
> 'org.apache.flink.kubernetes.configuration.NodePortAddressType.NodePortAddressType'
>  should add 'Hostname' enumeration value.
> {code:java}
> //org.apache.flink.kubernetes.configuration.NodePortAddressType.NodePortAddressType
>     public enum NodePortAddressType {
>         InternalIP,
>         ExternalIP,
>         Hostname
>     } {code}
>  
> !image-2023-04-20-17-53-30-969.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31601) While waiting for resources, resources check might be scheduled unlimited number of times (Adaptive Scheduler)

2023-10-24 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31601:
--
Fix Version/s: (was: 1.17.2)

> While waiting for resources, resources check might be scheduled unlimited 
> number of times (Adaptive Scheduler)
> --
>
> Key: FLINK-31601
> URL: https://issues.apache.org/jira/browse/FLINK-31601
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Roman Khachatryan
>Priority: Minor
>
> See [https://github.com/apache/flink/pull/22169#discussion_r1136395017]
> {quote}when {{resourceStabilizationDeadline}} is not null, should we skip 
> scheduling {{checkDesiredOrSufficientResourcesAvailable}} (on [line 
> 166|https://github.com/apache/flink/blob/a64781b1ef8f129021bdcddd3b07548e6caa4a72/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java#L166])?
> Otherwise, we schedule as many checks as there are changes in resources.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process

2023-10-24 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779101#comment-17779101
 ] 

Jing Ge edited comment on FLINK-33301 at 10/24/23 3:45 PM:
---

Thanks for sharing your thoughts. I am trying to share my experience and 
feeling that most release managers will be facing the same issues. Following 
are my thoughts:
 # Yes, using pom to control the versions centrally is a good idea. But the 
current Flink release is not pure maven based, it contains many scripts beyond 
Maven. If someone like me has wrong versions of Java or Maven, it can only be 
failed at the time when maven -Prelease is called.
 # During Flink 1.18 release I ran the release process 4 times and I have other 
works in between. Since Flink is an open source project, I guess what I was 
facing is very common, release managers will have other works in parallel. Java 
and Maven versions will be switched back and forth depending on which project 
the release manager is working on. Each time, when I switched back to Flink 
release, I would not always call mvn -Prelease, because I already did the 
preparation. I just jumped into the "Build a release candidate" section.  All 
scripts described on the wiki page will be executed. And the fact is that I 
created my own protocol/program to execute the release in order to move faster 
without read the long content. At that time, my only focus was those scripts. I 
had the version issues twice more in addition to the one detected by rc2. As a 
release manager, I am keen to have the check at script level, because it is the 
entry point of my work. I might have my own script on top of the official 
scripts to save even more time. Like I did with the PR to have the following 
error messages(as example) at very beginning with any script I might call:

{code:java}
Java version is incorrect. Required version: 1.8, but it is 17.0.8

Maven version is incorrect. Required version: 3.8.6, but it is 3.9.1{code}
 Since the output is controlled by us, we can tell release managers anything we 
want to help them work more efficiently.

This is my personal experience, I want to save all future release managers' 
time I could have saved for myself. I am not sure if Maven could help me the 
way I required.

 

 


was (Author: jingge):
Thanks for sharing your thoughts. I am trying to share my experience and 
feeling that most release managers will be facing the same issues. Following 
are my thoughts:


 # Yes, using pom to control the versions centrally is a good idea. But the 
current Flink release is not pure maven based, it contains many scripts beyond 
Maven. If someone like me has wrong versions of Java or Maven, it can only be 
failed at the time when maven -Prelease is called.
 # During Flink 1.18 release I ran the release process 4 times and I have other 
works in between. Since Flink is an open source project, I guess what I was 
facing is very common, release managers will have other works in parallel. Java 
and Maven versions will be switched back and forth depending on which project 
the release manager is working on. Each time, when I switched back to Flink 
release, I would not always call mvn -Prelease, because I already did the 
preparation. I just jumped into the "Build a release candidate" section.  All 
scripts described on the wiki page will be executed. And the fact is that I 
created my own protocol/program to execute the release in order to move faster 
without read the long content. At that time, my only focus was those scripts. I 
had the version issues twice more in addition to the one detected by rc2. As a 
release manager, I am keen to have the check at script level, because it is the 
entry point of my work. I might have my own script on top of the official 
scripts to save even more time. Like I did with the PR to have the following 
error messages(as example) at very beginning with any script I might call:


{code:java}
Java version is incorrect. Required version: 1.8, but it is 17.0.8

Maven version is incorrect. Required version: 3.8.6, but it is 3.9.1{code}
 Since the output is controlled by us, we can tell release managers anything we 
want to help them work more efficiently.

This is my personal experience, I want to save all future release managers' 
time I could have saved. I am not sure if Maven could help me the way I 
required.

 

 

> Add Java and Maven version checks in the bash script of Flink release process
> -
>
> Key: FLINK-33301
> URL: https://issues.apache.org/jira/browse/FLINK-33301
> Project: Flink
>  Issue Type: Bug
>  Components: Release System
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  L

[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process

2023-10-24 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779150#comment-17779150
 ] 

Jing Ge edited comment on FLINK-33301 at 10/24/23 3:44 PM:
---

The value(my intention) was to fail faster before maven is called, like I 
mentioned the entry point is script not maven. IMHO, anything we built to 
improve the efficiency will need to be maintained. That is the cost. The key 
point is how often the built thing will be used vs. how often it need to be 
maintained. Afaik, the versions of Java or maven might need to be 
changed/maintained once every 2 years or even longer. 

 

But, I got your point. Your thoughts are rational too. Let's stay with your 
solution and we could still come back after we got more feeling/feedback about 
it. WDYT?


was (Author: jingge):
The value(my intention) is to fail faster before maven is called, like I 
mentioned the entry point is script not maven. IMHO, anything we built to 
improve the efficiency will need to be maintained. That is the cost. The key 
point is how often the built thing will be used vs. how often it need to be 
maintained. Afaik, the versions of Java or maven might need to be 
changed/maintained once every 2 years or even longer. 

 

But, I got your point. Your thoughts are rational too. Let's stay with your 
solution and we could still come back after we got more feeling/feedback about 
it. WDYT?

> Add Java and Maven version checks in the bash script of Flink release process
> -
>
> Key: FLINK-33301
> URL: https://issues.apache.org/jira/browse/FLINK-33301
> Project: Flink
>  Issue Type: Bug
>  Components: Release System
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  Labels: pull-request-available
>
> During the release, Flink requires specific version of Java and Maven[1]. It 
> makes sense to check those versions at the very beginning of some bash 
> scripts to let it fail fast and therefore improve the efficiency.
>  
> [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31875) OSS throwns NoClassDefFoundError due to old hadoop-common version

2023-10-24 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31875:
--
Fix Version/s: (was: 1.16.3)
   (was: 1.17.2)

> OSS throwns NoClassDefFoundError due to old hadoop-common version
> -
>
> Key: FLINK-31875
> URL: https://issues.apache.org/jira/browse/FLINK-31875
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> h2. Problem
> When using OSS in 1.17, an exception will be thrown:
> {code:java}
> java.lang.NoClassDefFoundError: 
> org/apache/hadoop/thirdparty/com/google/common/base/Preconditions
> at 
> org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption(AliyunOSSUtils.java:221)
> at 
> org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:343)
> at 
> org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:147)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274){code}
>  It could be reproduced in ITCASE of OSS if some envs has been configured.
> h2. Why
> After https://issues.apache.org/jira/browse/FLINK-27308 and  
> https://issues.apache.org/jira/browse/FLINK-29502 ,hadoop-aliyun has also be 
> upgraded to 3.3.4 which relys on the newest version of hadoop-common.
> OSS still uses the old version (2.10.2) extended from flink-parent so that 
> some classes cannot be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30219) Fetch results api in sql gateway return error result.

2023-10-24 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-30219:
--
Fix Version/s: (was: 1.16.3)

> Fetch results api in sql gateway return error result.
> -
>
> Key: FLINK-30219
> URL: https://issues.apache.org/jira/browse/FLINK-30219
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: Aiden Gong
>Assignee: Aiden Gong
>Priority: Critical
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2022-11-26-10-38-02-270.png
>
>
> !image-2022-11-26-10-38-02-270.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33089) Drop Flink 1.13 and 1.14 support for the operator

2023-10-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-33089:
---
Description: 
As agreed with the community we will only support the last 4 stable Flink minor 
versions.
With Flink 1.18 already out, we should drop 1.13 and 1.14 support from the 
operator.

This includes any special codepaths required and we should probably throw a 
validation error and short-circuit reconciliation on unsupported versions to 
signal to users and avoid any accidental deployment problems.

  was:
As agreed with the community we will only support the last 4 stable Flink minor 
versions.
With Flink 1.17 already out, we should drop 1.13 support from the operator.

This includes any special codepaths required and we should probably throw a 
validation error and short-circuit reconciliation on unsupported versions to 
signal to users and avoid any accidental deployment problems.


> Drop Flink 1.13 and 1.14 support for the operator
> -
>
> Key: FLINK-33089
> URL: https://issues.apache.org/jira/browse/FLINK-33089
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.7.0
>
>
> As agreed with the community we will only support the last 4 stable Flink 
> minor versions.
> With Flink 1.18 already out, we should drop 1.13 and 1.14 support from the 
> operator.
> This includes any special codepaths required and we should probably throw a 
> validation error and short-circuit reconciliation on unsupported versions to 
> signal to users and avoid any accidental deployment problems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33089) Drop Flink 1.13 and 1.14 support for the operator

2023-10-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-33089:
---
Summary: Drop Flink 1.13 and 1.14 support for the operator  (was: Drop 
Flink 1.13 support)

> Drop Flink 1.13 and 1.14 support for the operator
> -
>
> Key: FLINK-33089
> URL: https://issues.apache.org/jira/browse/FLINK-33089
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.7.0
>
>
> As agreed with the community we will only support the last 4 stable Flink 
> minor versions.
> With Flink 1.17 already out, we should drop 1.13 support from the operator.
> This includes any special codepaths required and we should probably throw a 
> validation error and short-circuit reconciliation on unsupported versions to 
> signal to users and avoid any accidental deployment problems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process

2023-10-24 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779150#comment-17779150
 ] 

Jing Ge commented on FLINK-33301:
-

The value(my intention) is to fail faster before maven is called, like I 
mentioned the entry point is script not maven. IMHO, anything we built to 
improve the efficiency will need to be maintained. That is the cost. The key 
point is how often the built thing will be used vs. how often it need to be 
maintained. Afaik, the versions of Java or maven might need to be 
changed/maintained once every 2 years or even longer. 

 

But, I got your point. Your thoughts are rational too. Let's stay with your 
solution and we could still come back after we got more feeling/feedback about 
it. WDYT?

> Add Java and Maven version checks in the bash script of Flink release process
> -
>
> Key: FLINK-33301
> URL: https://issues.apache.org/jira/browse/FLINK-33301
> Project: Flink
>  Issue Type: Bug
>  Components: Release System
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  Labels: pull-request-available
>
> During the release, Flink requires specific version of Java and Maven[1]. It 
> makes sense to check those versions at the very beginning of some bash 
> scripts to let it fail fast and therefore improve the efficiency.
>  
> [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33353) SQL fails because "TimestampType.kind" is not serialized

2023-10-24 Thread Ferenc Csaky (Jira)
Ferenc Csaky created FLINK-33353:


 Summary: SQL fails because "TimestampType.kind" is not serialized 
 Key: FLINK-33353
 URL: https://issues.apache.org/jira/browse/FLINK-33353
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Ferenc Csaky


We have a custom persistent catalog store, which stores tables, views etc. in a 
DB. In our application, it is required to utilize the serialized formats of 
entities, but the same applies to the Hive, as it functions as a persistent 
catalog.

Take the following example SQL:

{code:sql}
CREATE TABLE IF NOT EXISTS `txn_gen` (
  `txn_id` INT,
  `amount` INT,
  `ts` TIMESTAMP(3),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND
) WITH (
  'connector' = 'datagen',
  'fields.txn_id.min' = '1',
  'fields.txn_id.max' = '5',
  'rows-per-second' = '1'
);

CREATE VIEW IF NOT EXISTS aggr_ten_sec AS
  SELECT txn_id,
 TUMBLE_ROWTIME(`ts`, INTERVAL '10' SECOND) AS w_row_time,
 COUNT(txn_id) AS txn_count
FROM txn_gen
GROUP BY txn_id, TUMBLE(`ts`, INTERVAL '10' SECOND);

SELECT txn_id,
   SUM(txn_count),
   TUMBLE_START(w_row_time, INTERVAL '20' SECOND) AS total_txn_count
  FROM aggr_ten_sec
  GROUP BY txn_id, TUMBLE(w_row_time, INTERVAL '20' SECOND);
{code}

This will work without any problems when we simply execute it in a 
{{TableEnvironment}}, but it fails with the below error when we try to execute 
the query based on the serialized table metadata.
{code}
org.apache.flink.table.api.TableException: Window aggregate can only be defined 
over a time attribute column, but TIMESTAMP(3) encountered.
{code}

If there is a view which would require to use ROWTIME, it will be lost and we 
cannot recreate the same query from the serialized entites.

Currently in {{TimestampType}} the "kind" field is deliberatly annotated as 
{{@Internal}} and is not serialized, although it breaks this functionality.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


gyfora commented on code in PR #685:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370421339


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java:
##
@@ -197,4 +198,19 @@ private static Event buildEvent(
 .endMetadata()
 .build();
 }
+
+private static boolean intervalCheck(Event existing, @Nullable Duration 
interval) {
+return interval != null
+&& Instant.now()
+.isBefore(
+Instant.parse(existing.getLastTimestamp())
+.plusMillis(interval.toMillis()));
+}
+
+private static boolean labelCheck(
+Event existing, Predicate> dedupePredicate) {
+return dedupePredicate == null
+|| (existing.getMetadata() != null
+&& 
dedupePredicate.test(existing.getMetadata().getLabels()));
+}

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


clarax commented on code in PR #685:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370421148


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -201,8 +201,8 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
-public static final ConfigOption SCALING_REPORT_INTERVAL =
-autoScalerConfig("scaling.report.interval")
+public static final ConfigOption SCALING_EVENT_INTERVAL =
+autoScalerConfig("scaling.event.interval")

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33187] using hashcode for parallelism map comparison [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


clarax commented on code in PR #685:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/685#discussion_r1370420200


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java:
##
@@ -197,4 +198,19 @@ private static Event buildEvent(
 .endMetadata()
 .build();
 }
+
+private static boolean intervalCheck(Event existing, @Nullable Duration 
interval) {
+return interval != null
+&& Instant.now()
+.isBefore(
+Instant.parse(existing.getLastTimestamp())
+.plusMillis(interval.toMillis()));
+}
+
+private static boolean labelCheck(
+Event existing, Predicate> dedupePredicate) {
+return dedupePredicate == null
+|| (existing.getMetadata() != null
+&& 
dedupePredicate.test(existing.getMetadata().getLabels()));
+}

Review Comment:
   when Interval == null, we don't dedupe. This is intentional. It is commented 
in the interface method param and unit  tested.
   but when the config is null, we use the default value of 30 min. It is also 
documented and unit tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33306) Use observed true processing rate when source metrics are incorrect

2023-10-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-33306.
--
Resolution: Fixed

merged to main cc680e142bb8d52c4db215658ee7f4c4159a0fe4

> Use observed true processing rate when source metrics are incorrect
> ---
>
> Key: FLINK-33306
> URL: https://issues.apache.org/jira/browse/FLINK-33306
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The aim is to address the cases when Flink incorrectly reports low busy time 
> (high idleness) for sources that are in fact cannot keep up due to the 
> slowness of the reader/fetchers. As the metrics cannot be generally fixed on 
> the Flink - connector side we have to detect this and handle it when 
> collecting the metrics.
> The main symptom of this problem is overestimation of the true processing 
> rate and not triggering scaling even if lag is building up as the autoscaler 
> thinks it will be able to keep up.
> To tackle this we differentiate two different methods of TPR measurement:
>  # *Busy-time based TPR* (this is the current approach in the autoscaler) : 
> computed from incoming records and busy time
>  # *Observed TPR* : computed from incoming records and back pressure, 
> measurable only when we assume full processing throughput (i.e during 
> catch-up)
> h3. Current behaviour
> The operator currently always uses a busy-time based TPR calculation which is 
> very flexible and allows for scaling up / down but is susceptible to 
> overestimation due to the broken metrics.
> h3. Suggested new behaviour
> Instead of using the busy-time based TPR we detect when TPR is overestimated 
> (busy-time too low) and switch to observed TPR.
> To do this, whenever we there is lag for a source (during catchup, or 
> lag-buildup) we measure both busy-time and observed TPR.
> If the avg busy-time based TPR is off by a configured amount we switch to 
> observed TPR for this source during metric evaluation.
> *Why not use observed TPR all the time?*
> Observed TPR can only be measured when we are catching up (during 
> stabilization) or when cannot keep up. This makes it harder to scale down or 
> to detect changes in source throughput over time (before lag starts to build 
> up). Instead of using observed TPR we switch to it only when we detect a 
> problem with the busy-time (this is a rare case overall), to hopefully get 
> the best of both worlds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33306] Use observed source throughput as true processing rate [flink-kubernetes-operator]

2023-10-24 Thread via GitHub


gyfora merged PR #686:
URL: https://github.com/apache/flink-kubernetes-operator/pull/686


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33274][release] Add release note for version 1.18 [flink]

2023-10-24 Thread via GitHub


JingGe merged PR #23527:
URL: https://github.com/apache/flink/pull/23527


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779129#comment-17779129
 ] 

Stefan Richter commented on FLINK-33341:


FYI, here is a link to the development branch: 
https://github.com/apache/flink/compare/master...StefanRRichter:flink:srichter-local-rescaling-FLINK-33341

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process

2023-10-24 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779122#comment-17779122
 ] 

Matthias Pohl commented on FLINK-33301:
---

{quote}
Yes, using pom to control the versions centrally is a good idea. But the 
current Flink release is not pure maven based, it contains many scripts beyond 
Maven. If someone like me has wrong versions of Java or Maven, it can only be 
failed at the time when maven -Prelease is called.
{quote}
That is why we have the script. So that you don't have to call {{mvn 
-Prelease}} on your own.

{quote}
During Flink 1.18 release I ran the release process 4 times and I have other 
works in between. Since Flink is an open source project, I guess what I was 
facing is very common, release managers will have other works in parallel. Java 
and Maven versions will be switched back and forth depending on which project 
the release manager is working on. Each time, when I switched back to Flink 
release, I would not always call mvn -Prelease, because I already did the 
preparation. I just jumped into the "Build a release candidate" section.  All 
scripts described on the wiki page will be executed. And the fact is that I 
created my own protocol/program to execute the release in order to move faster 
without read the long content. At that time, my only focus was those scripts. I 
had the version issues twice more in addition to the one detected by rc2. As a 
release manager, I am keen to have the check at script level, because it is the 
entry point of my work. I might have my own script on top of the official 
scripts to save even more time. Like I did with the PR to have the following 
error messages(as example) at very beginning with any script I might call:
{quote}

Just for the record: the only value you're adding is that the error is raised 
when calling {{tools/releasing/create_source_release.sh}}. The other two script 
do print the error already (because the call Maven's enforcement plugin). 
Additionally, you're introducing a helper script that can be used if you're not 
familiar with the Maven features for calling the version enforcement correctly. 

In contrast to that, you're adding one additional artifact where we have to 
maintain the Maven and Java version. You could adapt your script in a way that 
it uses the Maven command, I shared in my comment above. This way, we don't 
introduce new code locations with the version being hard-coded but rely on the 
Maven configuration. That keeps maintainability at the current level.

You could even grep for the relevant output if you're concerned about the 
verbose ERROR output: {{mvn -q -Prelease -pl flink-annotations validate | grep 
-v ERROR}}. WDYT?

> Add Java and Maven version checks in the bash script of Flink release process
> -
>
> Key: FLINK-33301
> URL: https://issues.apache.org/jira/browse/FLINK-33301
> Project: Flink
>  Issue Type: Bug
>  Components: Release System
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Minor
>  Labels: pull-request-available
>
> During the release, Flink requires specific version of Java and Maven[1]. It 
> makes sense to check those versions at the very beginning of some bash 
> scripts to let it fail fast and therefore improve the efficiency.
>  
> [1][https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0|https://lists.apache.org/thread/fbdl2w6wjmwk55y94ml91bpnhmh4rnm0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33301) Add Java and Maven version checks in the bash script of Flink release process

2023-10-24 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779122#comment-17779122
 ] 

Matthias Pohl edited comment on FLINK-33301 at 10/24/23 2:52 PM:
-

{quote}
Yes, using pom to control the versions centrally is a good idea. But the 
current Flink release is not pure maven based, it contains many scripts beyond 
Maven. If someone like me has wrong versions of Java or Maven, it can only be 
failed at the time when maven -Prelease is called.
{quote}
That is why we have the script. So that you don't have to call {{mvn 
-Prelease}} on your own.

{quote}
During Flink 1.18 release I ran the release process 4 times and I have other 
works in between. Since Flink is an open source project, I guess what I was 
facing is very common, release managers will have other works in parallel. Java 
and Maven versions will be switched back and forth depending on which project 
the release manager is working on. Each time, when I switched back to Flink 
release, I would not always call mvn -Prelease, because I already did the 
preparation. I just jumped into the "Build a release candidate" section.  All 
scripts described on the wiki page will be executed. And the fact is that I 
created my own protocol/program to execute the release in order to move faster 
without read the long content. At that time, my only focus was those scripts. I 
had the version issues twice more in addition to the one detected by rc2. As a 
release manager, I am keen to have the check at script level, because it is the 
entry point of my work. I might have my own script on top of the official 
scripts to save even more time. Like I did with the PR to have the following 
error messages(as example) at very beginning with any script I might call:
{quote}

Just for the record: the only value you're adding is that the error is raised 
when calling {{tools/releasing/create_source_release.sh}}. The other two script 
do print the error already (because they call Maven's enforcement plugin). 
Additionally, you're introducing a helper script that can be used if you're not 
familiar with the Maven features for calling the version enforcement correctly. 

In contrast to that, you're adding one additional artifact where we have to 
maintain the Maven and Java version. You could adapt your script in a way that 
it uses the Maven command, I shared in my comment above. This way, we don't 
introduce new code locations with the version being hard-coded but rely on the 
Maven configuration. That keeps maintainability at the current level.

You could even grep for the relevant output if you're concerned about the 
verbose ERROR output: {{mvn -q -Prelease -pl flink-annotations validate | grep 
-v ERROR}}. WDYT?


was (Author: mapohl):
{quote}
Yes, using pom to control the versions centrally is a good idea. But the 
current Flink release is not pure maven based, it contains many scripts beyond 
Maven. If someone like me has wrong versions of Java or Maven, it can only be 
failed at the time when maven -Prelease is called.
{quote}
That is why we have the script. So that you don't have to call {{mvn 
-Prelease}} on your own.

{quote}
During Flink 1.18 release I ran the release process 4 times and I have other 
works in between. Since Flink is an open source project, I guess what I was 
facing is very common, release managers will have other works in parallel. Java 
and Maven versions will be switched back and forth depending on which project 
the release manager is working on. Each time, when I switched back to Flink 
release, I would not always call mvn -Prelease, because I already did the 
preparation. I just jumped into the "Build a release candidate" section.  All 
scripts described on the wiki page will be executed. And the fact is that I 
created my own protocol/program to execute the release in order to move faster 
without read the long content. At that time, my only focus was those scripts. I 
had the version issues twice more in addition to the one detected by rc2. As a 
release manager, I am keen to have the check at script level, because it is the 
entry point of my work. I might have my own script on top of the official 
scripts to save even more time. Like I did with the PR to have the following 
error messages(as example) at very beginning with any script I might call:
{quote}

Just for the record: the only value you're adding is that the error is raised 
when calling {{tools/releasing/create_source_release.sh}}. The other two script 
do print the error already (because the call Maven's enforcement plugin). 
Additionally, you're introducing a helper script that can be used if you're not 
familiar with the Maven features for calling the version enforcement correctly. 

In contrast to that, you're adding one additional artifact where we have to 
maintain the Maven and Java version. You could adapt your scri

[jira] [Commented] (FLINK-33341) Use available local state for rescaling

2023-10-24 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779124#comment-17779124
 ] 

Stefan Richter commented on FLINK-33341:


[~Yanfei Lei], yes only the previous local state is available to be used in 
rescaling, so we might still need to download additional state from remote. But 
oftentimes we don't need to download everything from remote, in particular if 
we scale out we will often find the complete state locally on some machines and 
just need to drop some key-groups. And for scale-in, we should at least find 
one piece of the state locally. There is no good reason not to 
opportunistically use local state also in rescaling scenarios. No change to the 
scheduler will be needed.

> Use available local state for rescaling
> ---
>
> Key: FLINK-33341
> URL: https://issues.apache.org/jira/browse/FLINK-33341
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> Local state is currently only used for recovery. However, it would make sense 
> to also use available local state in rescaling scenarios to reduce the amount 
> of data to download from remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33352][rest][docs] Add schema mappings to discriminator properties [flink]

2023-10-24 Thread via GitHub


flinkbot commented on PR #23588:
URL: https://github.com/apache/flink/pull/23588#issuecomment-1777387430

   
   ## CI report:
   
   * 7a802e3654a0e6f2d68cad0deb6af0c4557f082b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >