[GitHub] [flink] JunRuiLee commented on pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


JunRuiLee commented on PR #21963:
URL: https://github.com/apache/flink/pull/21963#issuecomment-1443025904

   LGTM.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-23 Thread via GitHub


gaborgsomogyi commented on PR #22009:
URL: https://github.com/apache/flink/pull/22009#issuecomment-1443023755

   e2e tests are still running so most probably it's going to pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-23 Thread via GitHub


gaborgsomogyi commented on code in PR #22009:
URL: https://github.com/apache/flink/pull/22009#discussion_r1116617650


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java:
##
@@ -70,10 +71,22 @@ public void install() throws SecurityInstallException {
 
 try {
 KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(securityConfig);
-if (kerberosLoginProvider.isLoginPossible()) {
-kerberosLoginProvider.doLogin();
+if (kerberosLoginProvider.isLoginPossible(true)) {
+kerberosLoginProvider.doLogin(true);
 loginUser = UserGroupInformation.getLoginUser();
 
+if (HadoopUserUtils.isProxyUser((loginUser))

Review Comment:
   I think it's good to add this in general not to change the original 
behavior. I'm just writing down a possible future consideration so no change 
needed for now.
   
   This pointed me to an edge case where a user wants to use proxy user support 
w/ non-Hadoop delegation tokens like S3. The original implementation is not 
allowing to do this but maybe there is a need for it. I need some more time to 
consider this in-depth. Later on we might remove this check when the use-cases 
expecting that.
   



##
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java:
##
@@ -57,7 +58,7 @@ public void isLoginPossibleMustReturnFalseByDefault() throws 
IOException {
 UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
 
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-assertFalse(kerberosLoginProvider.isLoginPossible());
+assertFalse(kerberosLoginProvider.isLoginPossible(true));

Review Comment:
   Such original tests are testing the no proxy cases. I would either set the 
new boolean on all non-proxy cases to false or test both proxy and non-proxy 
cases (true + false too) to be 100% sure that the original behavior is covered. 
It's your choice. Since we're under time pressure I would vote on the first.



##
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java:
##
@@ -99,6 +101,8 @@ public void doLogin() throws IOException {
 LOG.info("Attempting to load user's ticket cache");
 UserGroupInformation.loginUserFromSubject(null);
 LOG.info("Loaded user's ticket cache successfully");
+} else if (supportProxyUser) {
+LOG.info("Delegation token fetch is managed and therefore login is 
managed");

Review Comment:
   Maybe we can say something like: `Proxy user doesn't need login since it 
must have credentials already`. I think `KerberosLoginProvider` shouldn't know 
about tokens at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-02-23 Thread via GitHub


flinkbot commented on PR #22010:
URL: https://github.com/apache/flink/pull/22010#issuecomment-1443013003

   
   ## CI report:
   
   * 5f4a9ee58a62b1b2e4d540ea6293554638722e53 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzhiwen1255 commented on pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-02-23 Thread via GitHub


xuzhiwen1255 commented on PR #22010:
URL: https://github.com/apache/flink/pull/22010#issuecomment-1442997250

   @reswqa Can you review it for me, Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30583) Provide the flame graph to the subtask level

2023-02-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693055#comment-17693055
 ] 

Piotr Nowojski commented on FLINK-30583:


Thanks for this improvement! I've just found it in the WebUI and docs :)

> Provide the flame graph to the subtask level
> 
>
> Key: FLINK-30583
> URL: https://issues.apache.org/jira/browse/FLINK-30583
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.17.0
>
>
> This is a umbrella Jira about providing the flame graph to the subtask level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80

2023-02-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-27115.
--
Resolution: Fixed

master(1.18) via 67bd4ba7f300749c4831b37df525b82a76d3c36c
release-1.17 via 6170615ed0aa3495c374cce8d33b0ea272b4bccc

> Error while executing BLOB connection. java.io.IOException: Unknown operation 
> 80
> 
>
> Key: FLINK-27115
> URL: https://issues.apache.org/jira/browse/FLINK-27115
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: zhiyuan
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2023-02-23-11-14-16-479.png, 
> image-2023-02-23-11-30-39-406.png
>
>
> hi, 
> I have a Flink SQL job running online. Every morning, I will report the 
> following screenshot error. I have enabled the debug log to check and find 
> nothing
> version 1.12
> !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.=outlook.live.com=20220401003.04=true!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31192) dataGen takes too long to initialize under sequence

2023-02-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31192:
---
Labels: pull-request-available  (was: )

> dataGen takes too long to initialize under sequence
> ---
>
> Key: FLINK-31192
> URL: https://issues.apache.org/jira/browse/FLINK-31192
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: xzw0223
>Assignee: xzw0223
>Priority: Major
>  Labels: pull-request-available
>
> The SequenceGenerator preloads all sequence values in open. If the 
> totalElement number is too large, it will take too long.
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java#L91]
> The reason is that the capacity of the Deque will be expanded twice when the 
> current capacity is full, and the array copy is required, which is 
> time-consuming.
>  
> Here's what I think : 
>  do not preload the full amount of data on Sequence, and generate a piece of 
> data each time next is called to solve the problem of slow initialization 
> caused by loading full amount of data.
>   record the currently sent Sequence position through the checkpoint, and 
> continue to send data through the recorded position after an abnormal restart 
> to ensure fault tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xuzhiwen1255 opened a new pull request, #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-02-23 Thread via GitHub


xuzhiwen1255 opened a new pull request, #22010:
URL: https://github.com/apache/flink/pull/22010

   ## What is the purpose of the change
   In the sequence mode, the dataGen field will preload all the sequence values 
when it is initialized, and it will take a long time during this period
   
   ## Brief change log
   - Adapt the way of preloading the value to generate a sequence value every 
time the next method is called
   - The checkpoint saves not all the sequence values, but the position where 
the current task sends the sequence value
   
   ## Verifying this change
   `DataGeneratorSourceTest` and `DataGenTableSourceFactoryTest`
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency):   no 
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:   no 
 - The serializers:   no 
 - The runtime per-record code paths (performance sensitive):   no  
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:   no 
 - The S3 file system connector:   no 
   
   ## Documentation
 - Does this pull request introduce a new feature?  no 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80

2023-02-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-27115:
---
Affects Version/s: 1.17.0

> Error while executing BLOB connection. java.io.IOException: Unknown operation 
> 80
> 
>
> Key: FLINK-27115
> URL: https://issues.apache.org/jira/browse/FLINK-27115
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: zhiyuan
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-02-23-11-14-16-479.png, 
> image-2023-02-23-11-30-39-406.png
>
>
> hi, 
> I have a Flink SQL job running online. Every morning, I will report the 
> following screenshot error. I have enabled the debug log to check and find 
> nothing
> version 1.12
> !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.=outlook.live.com=20220401003.04=true!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80

2023-02-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-27115:
---
Fix Version/s: 1.17.0

> Error while executing BLOB connection. java.io.IOException: Unknown operation 
> 80
> 
>
> Key: FLINK-27115
> URL: https://issues.apache.org/jira/browse/FLINK-27115
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: zhiyuan
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2023-02-23-11-14-16-479.png, 
> image-2023-02-23-11-30-39-406.png
>
>
> hi, 
> I have a Flink SQL job running online. Every morning, I will report the 
> following screenshot error. I have enabled the debug log to check and find 
> nothing
> version 1.12
> !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.=outlook.live.com=20220401003.04=true!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80

2023-02-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-27115:
--

Assignee: Weihua Hu

> Error while executing BLOB connection. java.io.IOException: Unknown operation 
> 80
> 
>
> Key: FLINK-27115
> URL: https://issues.apache.org/jira/browse/FLINK-27115
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: zhiyuan
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-02-23-11-14-16-479.png, 
> image-2023-02-23-11-30-39-406.png
>
>
> hi, 
> I have a Flink SQL job running online. Every morning, I will report the 
> following screenshot error. I have enabled the debug log to check and find 
> nothing
> version 1.12
> !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D=2=1=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.=outlook.live.com=20220401003.04=true!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa merged pull request #22004: [FLINK-27115][blob] Enrich the error log with remote address for blobServer

2023-02-23 Thread via GitHub


reswqa merged PR #22004:
URL: https://github.com/apache/flink/pull/22004


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


reswqa commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116565034


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() -> {
+testHarness.processElement(new 
StreamRecord<>("Doesn't matter", 0));
+throw new RuntimeException(
+"Expected an exception but ran 
successfully");
+})
+.isInstanceOf(ExpectedTestException.class);
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.isInstanceOf(FailingTwiceOperator.CloseException.class);

Review Comment:
   We don't need to go back to the original implementation. You can see my last 
comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-23 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693022#comment-17693022
 ] 

Yanfei Lei commented on FLINK-31089:


Let me try to summarize this issue:
 # Enable PinL0FilterAndIndexBlocksInCache or PinTopLevelIndexAndFilter, 
disable TTL, will result in OOM

 ## PinTopLevelIndexAndFilter can significantly affect the performance.
 ## PinL0FilterAndIndexBlocksInCache will NOT affect the performance.
 # Enable PinL0FilterAndIndexBlocksInCache or PinTopLevelIndexAndFilter, enable 
TTL, the memory wouldn't keep growing.  
 ## Due to https://issues.apache.org/jira/browse/FLINK-22957 , the TTL can't 
take effect for the Rank operator in Flink 1.13.

Is the TTL set by "table.exec.state.ttl"? If the job is a DataStream job, maybe 
you can set TTL for the rank operator via StateTtlConfig.

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


reswqa commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116563643


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -368,17 +368,19 @@ void testCleanUpExceptionSuppressing() throws Exception {
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-assertThatThrownBy(
-() -> {
-testHarness.processElement(new 
StreamRecord<>("Doesn't matter", 0));
-throw new RuntimeException(
-"Expected an exception but ran 
successfully");
-})
-.isInstanceOf(ExpectedTestException.class);
+try {
+testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
+throw new RuntimeException("Expected an exception but ran 
successfully");
+} catch (Exception ex) {
+ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
+}

Review Comment:
   ```suggestion
   Assertions.assertThatThrownBy(
   () ->
   testHarness.processElement(
   new StreamRecord<>("Doesn't 
matter", 0)))
   .satisfies(
   throwable ->
   ExceptionUtils.assertThrowable(
   throwable, 
ExpectedTestException.class));
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jhkkhtd commented on pull request #21998: [Flink-clients][api] Adding a method to get ClusterClientProvider

2023-02-23 Thread via GitHub


jhkkhtd commented on PR #21998:
URL: https://github.com/apache/flink/pull/21998#issuecomment-1442874077

   > Thanks @jhkkhtd. Firstly, You should open a [jira 
ticket](https://issues.apache.org/jira/projects/FLINK/issues) and describe 
clearly why you need this change and how you plan to change it. If the 
community thinks that this is a reasonable demand, someone will assign this 
ticket to you, and then you can start open the PR.
   
   Thanks , I have never use jira, I have searched in the Internet ,but i can't 
find a help information to create an account .
How should i get a jira account 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31208) KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits)

2023-02-23 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-31208:
--
Description: 
KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) ,which 
is no difference with its Parent class (SourceReaderBase). why not remove this 
override method?

 

Relative code is here, which we can see is no difference?
{code:java}
//org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
} 

//org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
}{code}

  was:
KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) 
,because no difference with its Parent class (SourceReaderBase). why not remove 
this override method?

 

Relative code is here, which we can see is no difference?
{code:java}
//org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
} 

//org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
}{code}


> KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits)
> ---
>
> Key: FLINK-31208
> URL: https://issues.apache.org/jira/browse/FLINK-31208
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Hongshun Wang
>Priority: Not a Priority
>
> KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) 
> ,which is no difference with its Parent class (SourceReaderBase). why not 
> remove this override method?
>  
> Relative code is here, which we can see is no difference?
> {code:java}
> //org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits
> @Override
> public void pauseOrResumeSplits(
> Collection splitsToPause, Collection splitsToResume) {
> splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
> } 
> //org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits
> @Override
> public void pauseOrResumeSplits(
> Collection splitsToPause, Collection splitsToResume) {
> splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31208) KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits)

2023-02-23 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-31208:
-

 Summary: KafkaSourceReader overrides meaninglessly a 
method(pauseOrResumeSplits)
 Key: FLINK-31208
 URL: https://issues.apache.org/jira/browse/FLINK-31208
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Hongshun Wang


KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) 
,because no difference with its Parent class (SourceReaderBase). why not remove 
this override method?

 

Relative code is here, which we can see is no difference?
{code:java}
//org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
} 

//org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116556431


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1085,27 +1081,27 @@ public void testNotifyCheckpointOnClosedOperator() 
throws Throwable {
 
 harness.streamTask.notifyCheckpointCompleteAsync(1);
 harness.streamTask.runMailboxStep();
-assertEquals(1, ClosingOperator.notified.get());
-assertFalse(ClosingOperator.closed.get());
+assertThat(ClosingOperator.notified.get()).isOne();

Review Comment:
   Thank you~. I updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116556097


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1121,13 +1117,14 @@ private void 
testFailToConfirmCheckpointMessage(Consumer> consu
 StreamTaskMailboxTestHarness harness =
 
builder.setupOutputForSingletonOperatorChain(streamMap).build();
 
-try {
-consumer.accept(harness.streamTask);
-harness.streamTask.runMailboxLoop();
-fail();
-} catch (ExpectedTestException expected) {
-// expected exceptionestProcessWithUnAvailableInput
-}
+// expected exceptionestProcessWithUnAvailableInput
+assertThatThrownBy(
+() -> {
+consumer.accept(harness.streamTask);
+harness.streamTask.runMailboxLoop();
+fail(null);

Review Comment:
   It's from the old line.
   And I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116555777


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() -> {
+testHarness.processElement(new 
StreamRecord<>("Doesn't matter", 0));
+throw new RuntimeException(

Review Comment:
   It's the old logic line from migration. but redundant.
   This lines rollback to the original lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


RocMarshal commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116555051


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() -> {
+testHarness.processElement(new 
StreamRecord<>("Doesn't matter", 0));
+throw new RuntimeException(
+"Expected an exception but ran 
successfully");
+})
+.isInstanceOf(ExpectedTestException.class);
 
-try {
-testHarness.finishProcessing();
-} catch (Exception ex) {
-// todo: checking for suppression if there are more exceptions 
during cleanup
-ExceptionUtils.assertThrowable(ex, 
FailingTwiceOperator.CloseException.class);
-}
+// todo: checking for suppression if there are more exceptions 
during cleanup
+assertThatThrownBy(testHarness::finishProcessing)
+.isInstanceOf(FailingTwiceOperator.CloseException.class);

Review Comment:
   nice catch~I compared `assertThrowable`logic with `isInstanceOf `, It's 
indeed as your description.
   I try to find the logic-equal API to replaced it but no result.
   
   So, I rollback to the `assertThrowable` with original implementation. 
   Of course, I'd like to accept any suggestions  helpful .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-22484) Built-in functions for collections

2023-02-23 Thread jackylau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693015#comment-17693015
 ] 

jackylau commented on FLINK-22484:
--

hi [~Sergey Nuyanzin] how aboult add 
map_entries/map_from_entries/map_from_arrays/map_contains_key functions, what 
do you think? the high order function like transform, which need calcite 
supports first

> Built-in functions for collections
> --
>
> Key: FLINK-22484
> URL: https://issues.apache.org/jira/browse/FLINK-22484
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> There is a number of built-in functions to work with collections are 
> supported by other vendors. After looking at Postgresql, BigQuery, Spark 
> there was selected a list of more or less generic functions for collections 
> (for more details see [1]).
> Feedback for the doc is  welcome
> [1] 
> [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing]
> MAP_KEYS
> MAP_VALUES
> MAP_FROM_ARRAYS



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


reswqa commented on code in PR #21999:
URL: https://github.com/apache/flink/pull/21999#discussion_r1116492529


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() -> {
+testHarness.processElement(new 
StreamRecord<>("Doesn't matter", 0));
+throw new RuntimeException(

Review Comment:
   Why throw exception here?



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1085,27 +1081,27 @@ public void testNotifyCheckpointOnClosedOperator() 
throws Throwable {
 
 harness.streamTask.notifyCheckpointCompleteAsync(1);
 harness.streamTask.runMailboxStep();
-assertEquals(1, ClosingOperator.notified.get());
-assertFalse(ClosingOperator.closed.get());
+assertThat(ClosingOperator.notified.get()).isOne();

Review Comment:
   IIRC, AssertJ does have specific assertions for `AtomicXXX`, please avoid 
using `get`.
   It is also necessary to check the similar problems of the whole test class.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -1121,13 +1117,14 @@ private void 
testFailToConfirmCheckpointMessage(Consumer> consu
 StreamTaskMailboxTestHarness harness =
 
builder.setupOutputForSingletonOperatorChain(streamMap).build();
 
-try {
-consumer.accept(harness.streamTask);
-harness.streamTask.runMailboxLoop();
-fail();
-} catch (ExpectedTestException expected) {
-// expected exceptionestProcessWithUnAvailableInput
-}
+// expected exceptionestProcessWithUnAvailableInput
+assertThatThrownBy(
+() -> {
+consumer.accept(harness.streamTask);
+harness.streamTask.runMailboxLoop();
+fail(null);

Review Comment:
   Why we need `fail`?



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##
@@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput(
 "savepointResult");
 harness.processAll();
 
-Assert.assertEquals(expectEndInput, 
TestBoundedOneInputStreamOperator.isInputEnded());
+
assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput);
 }
 
 @Test
-public void testCleanUpExceptionSuppressing() throws Exception {
+void testCleanUpExceptionSuppressing() throws Exception {
 try (StreamTaskMailboxTestHarness testHarness =
 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
 .addInput(STRING_TYPE_INFO)
 .setupOutputForSingletonOperatorChain(new 
FailingTwiceOperator())
 .build()) {
 
-try {
-testHarness.processElement(new StreamRecord<>("Doesn't 
matter", 0));
-throw new RuntimeException("Expected an exception but ran 
successfully");
-} catch (Exception ex) {
-ExceptionUtils.assertThrowable(ex, 
ExpectedTestException.class);
-}
+assertThatThrownBy(
+() -> {
+testHarness.processElement(new 
StreamRecord<>("Doesn't matter", 0));
+throw new RuntimeException(
+"Expected an exception but ran 
successfully");
+})
+

[jira] [Updated] (FLINK-31207) Supports high order function like other engine

2023-02-23 Thread jackylau (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jackylau updated FLINK-31207:
-
Description: 
spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform]

transform/transform_keys/transform_values 

after calcite  https://issues.apache.org/jira/browse/CALCITE-3679s upports high 
order functions, we should supports many high order funcsions like spark/presto

  was:
spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform]

after calcite  https://issues.apache.org/jira/browse/CALCITE-3679s upports high 
order functions, we should supports many high order funcsions like spark/presto


> Supports high order function like other engine
> --
>
> Key: FLINK-31207
> URL: https://issues.apache.org/jira/browse/FLINK-31207
> Project: Flink
>  Issue Type: Sub-task
>Reporter: jackylau
>Priority: Major
>
> spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform]
> transform/transform_keys/transform_values 
> after calcite  https://issues.apache.org/jira/browse/CALCITE-3679s upports 
> high order functions, we should supports many high order funcsions like 
> spark/presto



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] zhuangchong opened a new pull request, #556: [hotfix] Add version reference in the pom

2023-02-23 Thread via GitHub


zhuangchong opened a new pull request, #556:
URL: https://github.com/apache/flink-table-store/pull/556

   Add version reference of `kafka` dependent package


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-22091) env.java.home option didn't take effect in resource negotiator

2023-02-23 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693008#comment-17693008
 ] 

Samrat Deb commented on FLINK-22091:


i would like to work on this issue.

> env.java.home option didn't take effect in resource negotiator
> --
>
> Key: FLINK-22091
> URL: https://issues.apache.org/jira/browse/FLINK-22091
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.11.1, 1.12.2
>Reporter: zlzhang0122
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> If we have set the value of env.java.home in flink-conf.yaml, it will take 
> effect in standalone mode, but it won't take effect in resource negotiator 
> such as yarn, kubernetes, etc.. Maybe we can do some change and make it take 
> effect?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31207) Supports high order function like other engine

2023-02-23 Thread jackylau (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jackylau updated FLINK-31207:
-
Summary: Supports high order function like other engine  (was: supports 
high order function like other engine)

> Supports high order function like other engine
> --
>
> Key: FLINK-31207
> URL: https://issues.apache.org/jira/browse/FLINK-31207
> Project: Flink
>  Issue Type: Sub-task
>Reporter: jackylau
>Priority: Major
>
> spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform]
> after calcite  https://issues.apache.org/jira/browse/CALCITE-3679s upports 
> high order functions, we should supports many high order funcsions like 
> spark/presto



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31207) supports high order function like other engine

2023-02-23 Thread jackylau (Jira)
jackylau created FLINK-31207:


 Summary: supports high order function like other engine
 Key: FLINK-31207
 URL: https://issues.apache.org/jira/browse/FLINK-31207
 Project: Flink
  Issue Type: Sub-task
Reporter: jackylau


spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform]

after calcite  https://issues.apache.org/jira/browse/CALCITE-3679s upports high 
order functions, we should supports many high order funcsions like spark/presto



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring

2023-02-23 Thread via GitHub


1996fanrui commented on PR #21999:
URL: https://github.com/apache/flink/pull/21999#issuecomment-1442822176

   Please squish these commits, and follow the rules that @reswqa mentioned 
before. 
   
   
   > This PR will eventually have two commits in the following order:
   > 1.[hotfix] Migrate StreamTaskTest to Junit5 and Assertj
   > 
2.[[FLINK-29816](https://issues.apache.org/jira/browse/FLINK-29816)][streaming] 
Fix the bug that StreamTask 
   
   And  note the commit order, hotfix commit should before your commit, 
otherwises, you need to write new test with junit4, and then upgrade it in the 
next commit.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31196) Flink on YARN honors env.java.home

2023-02-23 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-31196:
--
Description: 
Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger 
containers with a configured env.java.home.

One option to set the JAVA_HOME for Flink JobManager, TaskManager running on 
YARN
{code:java}
containerized.master.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk
containerized.taskmanager.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk {code}
 

  was:Flink on YARN, honor env.java.home, and launch JobManager and 
TaskMananger containers with a configured env.java.home.


> Flink on YARN honors env.java.home
> --
>
> Key: FLINK-31196
> URL: https://issues.apache.org/jira/browse/FLINK-31196
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.16.1
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger 
> containers with a configured env.java.home.
> One option to set the JAVA_HOME for Flink JobManager, TaskManager running on 
> YARN
> {code:java}
> containerized.master.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk
> containerized.taskmanager.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31196) Flink on YARN honors env.java.home

2023-02-23 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph resolved FLINK-31196.
---
Resolution: Duplicate

> Flink on YARN honors env.java.home
> --
>
> Key: FLINK-31196
> URL: https://issues.apache.org/jira/browse/FLINK-31196
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.16.1
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger 
> containers with a configured env.java.home.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31196) Flink on YARN honors env.java.home

2023-02-23 Thread Prabhu Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693003#comment-17693003
 ] 

Prabhu Joseph commented on FLINK-31196:
---

Thanks [~zlzhang0122]  for the update. Have missed to search properly in Flink 
Jira List before raising the ticket. We will close this as a duplicate and 
provide a patch for the original one, Flink-22091

> Flink on YARN honors env.java.home
> --
>
> Key: FLINK-31196
> URL: https://issues.apache.org/jira/browse/FLINK-31196
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.16.1
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger 
> containers with a configured env.java.home.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-20395) Migrate test_netty_shuffle_memory_control.sh

2023-02-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-20395:
--

Assignee: Weijie Guo

> Migrate test_netty_shuffle_memory_control.sh
> 
>
> Key: FLINK-20395
> URL: https://issues.apache.org/jira/browse/FLINK-20395
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network, Test Infrastructure
>Reporter: Matthias Pohl
>Assignee: Weijie Guo
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31206) Broken links on flink.apache.org

2023-02-23 Thread Zili Chen (Jira)
Zili Chen created FLINK-31206:
-

 Summary: Broken links on flink.apache.org
 Key: FLINK-31206
 URL: https://issues.apache.org/jira/browse/FLINK-31206
 Project: Flink
  Issue Type: Bug
Reporter: Zili Chen


Previously page link 
https://flink.apache.org/contribute/code-style-and-quality/preamble/ is broken, 
new link is 
https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/.

Shall we set up a redirection or just let those broken links wait for 
maintainers fixing?

cc [~martijnvisser]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31204) HiveCatalogITCase fails due to avro conflict in table store

2023-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31204.

Fix Version/s: table-store-0.4.0
 Assignee: Shammon
   Resolution: Fixed

master: 306a9ededbebc1d825cbb02c18338f5accf7faca

> HiveCatalogITCase fails due to avro conflict in table store
> ---
>
> Key: FLINK-31204
> URL: https://issues.apache.org/jira/browse/FLINK-31204
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Shammon
>Assignee: Shammon
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> Test fails in IDEA
>   at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>   ... 4 more
> Caused by: java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:203)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:172)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:55)
>   at 
> org.apache.flink.table.store.format.avro.AvroFileFormat$AvroGenericRecordBulkFormat.(AvroFileFormat.java:95)
>   at 
> org.apache.flink.table.store.format.avro.AvroFileFormat.createReaderFactory(AvroFileFormat.java:80)
>   at 
> org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:71)
>   at 
> org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:67)
>   at 
> org.apache.flink.table.store.file.manifest.ManifestList$Factory.create(ManifestList.java:130)
>   at 
> org.apache.flink.table.store.file.operation.AbstractFileStoreScan.(AbstractFileStoreScan.java:95)
>   at 
> org.apache.flink.table.store.file.operation.KeyValueFileStoreScan.(KeyValueFileStoreScan.java:57)
>   at 
> org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:118)
>   at 
> org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:71)
>   at 
> org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:38)
>   at 
> org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:116)
>   at 
> org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:43)
>   at 
> org.apache.flink.table.store.table.AbstractFileStoreTable.newCommit(AbstractFileStoreTable.java:121)
>   at 
> org.apache.flink.table.store.connector.sink.FileStoreSink.lambda$createCommitterFactory$63124b4e$1(FileStoreSink.java:69)
>   at 
> org.apache.flink.table.store.connector.sink.CommitterOperator.initializeState(CommitterOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:750)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on pull request #20006: [FLINK-27415][Connectors / FileSystem] Read empty csv file throws exception in FileSystem table connector

2023-02-23 Thread via GitHub


wanglijie95 commented on PR #20006:
URL: https://github.com/apache/flink/pull/20006#issuecomment-1442775810

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #555: [FLINK-31204] Fix HiveCatalogITCase fails in IDEA

2023-02-23 Thread via GitHub


JingsongLi merged PR #555:
URL: https://github.com/apache/flink-table-store/pull/555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


wanglijie95 commented on PR #21963:
URL: https://github.com/apache/flink/pull/21963#issuecomment-1442773339

   Thanks for review @JunRuiLee @zhuzhurk . I 've addressed all comments, 
please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


wanglijie95 commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1116480439


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -1060,14 +,97 @@ private void setVertexConfig(
 vertexConfigs.put(vertexID, config);
 }
 
+private void setChainedOutputsConfig(
+Integer vertexId, StreamConfig config, List 
chainableOutputs) {
+// iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
+for (StreamEdge edge : chainableOutputs) {
+if (edge.getOutputTag() != null) {
+config.setTypeSerializerSideOut(
+edge.getOutputTag(),
+edge.getOutputTag()
+.getTypeInfo()
+
.createSerializer(streamGraph.getExecutionConfig()));
+}
+}
+config.setChainedOutputs(chainableOutputs);
+}
+
+private void setOperatorNonChainedOutputsConfig(
+Integer vertexId,
+StreamConfig config,
+List nonChainableOutputs,
+Map outputsConsumedByEdge) {
+// iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
+for (StreamEdge edge : nonChainableOutputs) {
+if (edge.getOutputTag() != null) {
+config.setTypeSerializerSideOut(
+edge.getOutputTag(),
+edge.getOutputTag()
+.getTypeInfo()
+
.createSerializer(streamGraph.getExecutionConfig()));
+}
+}
+
+List deduplicatedOutputs =
+mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, 
outputsConsumedByEdge);
+config.setNumberOfOutputs(deduplicatedOutputs.size());
+config.setOperatorNonChainedOutputs(deduplicatedOutputs);
+}
+
+private void setVertexNonChainedOutputsConfig(
+Integer startNodeId,
+StreamConfig config,
+List transitiveOutEdges,
+final Map> 
opIntermediateOutputs) {
+
+LinkedHashSet transitiveOutputs = new 
LinkedHashSet<>();
+for (StreamEdge edge : transitiveOutEdges) {
+NonChainedOutput output = 
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
+transitiveOutputs.add(output);
+connect(startNodeId, edge, output);
+}
+
+config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
+}
+
+private void setAllOperatorNonChainedOutputsConfigs(
+final Map> 
opIntermediateOutputs) {
+// set non chainable output config
+opNonChainableOutputsCache.forEach(
+(vertexId, nonChainableOutputs) -> {
+Map outputsConsumedByEdge =
+opIntermediateOutputs.computeIfAbsent(
+vertexId, ignored -> new HashMap<>());
+setOperatorNonChainedOutputsConfig(
+vertexId,
+vertexConfigs.get(vertexId),
+nonChainableOutputs,
+outputsConsumedByEdge);
+});
+}
+
+private void setAllVertexNonChainedOutputsConfigs(
+final Map> 
opIntermediateOutputs) {
+jobVertices
+.keySet()
+.forEach(
+startNodeId -> {
+setVertexNonChainedOutputsConfig(
+startNodeId,
+vertexConfigs.get(startNodeId),
+
chainInfos.get(startNodeId).getTransitiveOutEdges(),
+opIntermediateOutputs);
+});

Review Comment:
   I'm wrong. It's can be removed :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31107) Can't subscribe the non-persistent topics by using regex in Pulsar 2.11.0

2023-02-23 Thread Yufan Sheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yufan Sheng updated FLINK-31107:

Description: 
This is a ticket for tracking [the known 
issue|https://github.com/apache/pulsar/issues/19316] in Pulsar. Pulsar changes 
its internal logic for topic pattern subscribe and didn't return any 
{{non-persistent}} topics. We will close this issue after Pulsar fixes this bug 
in the future.

Another issue for tracking this BUG: 
https://github.com/apache/pulsar/issues/19493

  was:This is a ticket for tracking [the known 
issue|https://github.com/apache/pulsar/issues/19316] in Pulsar. Pulsar changes 
its internal logic for topic pattern subscribe and didn't return any 
{{non-persistent}} topics. We will close this issue after Pulsar fixes this bug 
in the future.


> Can't subscribe the non-persistent topics by using regex in Pulsar 2.11.0
> -
>
> Key: FLINK-31107
> URL: https://issues.apache.org/jira/browse/FLINK-31107
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.0.0
>Reporter: Yufan Sheng
>Priority: Minor
>
> This is a ticket for tracking [the known 
> issue|https://github.com/apache/pulsar/issues/19316] in Pulsar. Pulsar 
> changes its internal logic for topic pattern subscribe and didn't return any 
> {{non-persistent}} topics. We will close this issue after Pulsar fixes this 
> bug in the future.
> Another issue for tracking this BUG: 
> https://github.com/apache/pulsar/issues/19493



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


wanglijie95 commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1116477683


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -568,12 +592,7 @@ && isChainableInput(sourceOutEdge, streamGraph)) {
 final StreamConfig.SourceInputConfig inputConfig =
 new StreamConfig.SourceInputConfig(sourceOutEdge);
 final StreamConfig operatorConfig = new StreamConfig(new 
Configuration());
-setVertexConfig(
-sourceNodeId,
-operatorConfig,
-Collections.emptyList(),
-Collections.emptyList(),
-Collections.emptyMap());
+setOperatorConfig(sourceNodeId, operatorConfig, 
Collections.emptyMap());

Review Comment:
   Yes. I will modify it to the same logic as before
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


wanglijie95 commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1116477303


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroup.java:
##
@@ -53,9 +55,22 @@ public ForwardGroup(final Set jobVertices) {
 .map(JobVertex::getParallelism)
 .collect(Collectors.toSet());
 
-checkState(decidedParallelisms.size() <= 1);
-if (decidedParallelisms.size() == 1) {
-this.parallelism = decidedParallelisms.iterator().next();
+checkState(configuredParallelisms.size() <= 1);
+if (configuredParallelisms.size() == 1) {
+this.parallelism = configuredParallelisms.iterator().next();
+}
+
+Set configuredMaxParallelisms =
+jobVertices.stream()
+.map(JobVertex::getMaxParallelism)
+.filter(val -> val > 0)
+.collect(Collectors.toSet());
+
+if (!configuredMaxParallelisms.isEmpty()) {
+this.maxParallelism = Collections.min(configuredMaxParallelisms);
+checkState(
+maxParallelism >= parallelism,

Review Comment:
   I will change the check to `checkState(parallelism == -1 || maxParallelism 
>= parallelism)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] venkata91 commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-23 Thread via GitHub


venkata91 commented on PR #22009:
URL: https://github.com/apache/flink/pull/22009#issuecomment-1442766757

   Looking into the test failure, somehow it worked locally but failing in the 
pipeline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


wanglijie95 commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1116476622


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -928,14 +1011,8 @@ private StreamConfig createJobVertex(Integer 
streamNodeId, OperatorChainInfo cha
 return new StreamConfig(jobVertex.getConfiguration());

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


wanglijie95 commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1116476465


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -1060,14 +,97 @@ private void setVertexConfig(
 vertexConfigs.put(vertexID, config);
 }
 
+private void setChainedOutputsConfig(
+Integer vertexId, StreamConfig config, List 
chainableOutputs) {
+// iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
+for (StreamEdge edge : chainableOutputs) {
+if (edge.getOutputTag() != null) {
+config.setTypeSerializerSideOut(
+edge.getOutputTag(),
+edge.getOutputTag()
+.getTypeInfo()
+
.createSerializer(streamGraph.getExecutionConfig()));
+}
+}
+config.setChainedOutputs(chainableOutputs);
+}
+
+private void setOperatorNonChainedOutputsConfig(
+Integer vertexId,
+StreamConfig config,
+List nonChainableOutputs,
+Map outputsConsumedByEdge) {
+// iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
+for (StreamEdge edge : nonChainableOutputs) {
+if (edge.getOutputTag() != null) {
+config.setTypeSerializerSideOut(
+edge.getOutputTag(),
+edge.getOutputTag()
+.getTypeInfo()
+
.createSerializer(streamGraph.getExecutionConfig()));
+}
+}
+
+List deduplicatedOutputs =
+mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, 
outputsConsumedByEdge);
+config.setNumberOfOutputs(deduplicatedOutputs.size());
+config.setOperatorNonChainedOutputs(deduplicatedOutputs);
+}
+
+private void setVertexNonChainedOutputsConfig(
+Integer startNodeId,
+StreamConfig config,
+List transitiveOutEdges,
+final Map> 
opIntermediateOutputs) {
+
+LinkedHashSet transitiveOutputs = new 
LinkedHashSet<>();
+for (StreamEdge edge : transitiveOutEdges) {
+NonChainedOutput output = 
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
+transitiveOutputs.add(output);
+connect(startNodeId, edge, output);
+}
+
+config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
+}
+
+private void setAllOperatorNonChainedOutputsConfigs(
+final Map> 
opIntermediateOutputs) {
+// set non chainable output config
+opNonChainableOutputsCache.forEach(
+(vertexId, nonChainableOutputs) -> {
+Map outputsConsumedByEdge =
+opIntermediateOutputs.computeIfAbsent(
+vertexId, ignored -> new HashMap<>());
+setOperatorNonChainedOutputsConfig(
+vertexId,
+vertexConfigs.get(vertexId),
+nonChainableOutputs,
+outputsConsumedByEdge);
+});
+}
+
+private void setAllVertexNonChainedOutputsConfigs(
+final Map> 
opIntermediateOutputs) {
+jobVertices
+.keySet()
+.forEach(
+startNodeId -> {
+setVertexNonChainedOutputsConfig(
+startNodeId,
+vertexConfigs.get(startNodeId),
+
chainInfos.get(startNodeId).getTransitiveOutEdges(),
+opIntermediateOutputs);
+});

Review Comment:
   The {} is needed because the `setVertexNonChainedOutputsConfig` has no 
return value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

2023-02-23 Thread via GitHub


wuchong commented on PR #21966:
URL: https://github.com/apache/flink/pull/21966#issuecomment-1442759082

   @stayrascal thank you for helping to verifying it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

2023-02-23 Thread via GitHub


lsyldliu commented on PR #21966:
URL: https://github.com/apache/flink/pull/21966#issuecomment-1442750732

   @stayrascal Very thanks you can make double-check. I think it works well 
because it works on JM side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31117) Split flink connector to each module of each version

2023-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31117.

Resolution: Fixed

master: 37f75a85091697b75c6968293edfb060595adae3

> Split flink connector to each module of each version
> 
>
> Key: FLINK-31117
> URL: https://issues.apache.org/jira/browse/FLINK-31117
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> This will make compilation and testing much easier.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #545: [FLINK-31117] Split flink connector to each module of each version

2023-02-23 Thread via GitHub


JingsongLi merged PR #545:
URL: https://github.com/apache/flink-table-store/pull/545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] stayrascal commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

2023-02-23 Thread via GitHub


stayrascal commented on PR #21966:
URL: https://github.com/apache/flink/pull/21966#issuecomment-1442721215

   @lsyldliu thanks a lot for looking at this issue, may I check that if we use 
flink filesytem instead of hadoop filesytem to open parquet file, so that flink 
fs can retrieve the configured credential info(or fs properties) from 
fink-conf.yaml automatically?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] FangYongs commented on a diff in pull request #555: [FLINK-31204] Fix HiveCatalogITCase fails in IDEA

2023-02-23 Thread via GitHub


FangYongs commented on code in PR #555:
URL: https://github.com/apache/flink-table-store/pull/555#discussion_r1116445519


##
flink-table-store-hive/flink-table-store-hive-catalog/pom.xml:
##
@@ -535,6 +535,65 @@ under the License.
 org.jamon
 jamon-runtime
 
+
+org.apache.hive
+hive-exec
+
+
+
+
+
+
+org.apache.avro
+avro
+${avro.version}
+test
+
+
+
+org.apache.hive
+hive-exec
+${hive.version}
+provided

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-22922) Migrate flink project website to Hugo

2023-02-23 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692982#comment-17692982
 ] 

Zili Chen commented on FLINK-22922:
---

Hi [~martijnvisser]! I see now we use `asf-site` for both dev and deploy. How 
can we ensure the website is always rebuilt, or now we do it manually?

> Migrate flink project website to Hugo
> -
>
> Key: FLINK-22922
> URL: https://issues.apache.org/jira/browse/FLINK-22922
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Chesnay Schepler
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> Hugo is working like a charm for the Flink documentation. To reduce the 
> number of software stacks, and massively reduce friction when building the 
> current Flink website, we should migrate the Flink website to hugo as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Mulavar commented on pull request #21545: [FLINK-30396][table]make alias hint take effect in correlate

2023-02-23 Thread via GitHub


Mulavar commented on PR #21545:
URL: https://github.com/apache/flink/pull/21545#issuecomment-1442717916

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31204) HiveCatalogITCase fails due to avro conflict in table store

2023-02-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31204:
---
Labels: pull-request-available  (was: )

> HiveCatalogITCase fails due to avro conflict in table store
> ---
>
> Key: FLINK-31204
> URL: https://issues.apache.org/jira/browse/FLINK-31204
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Shammon
>Priority: Major
>  Labels: pull-request-available
>
> Test fails in IDEA
>   at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>   ... 4 more
> Caused by: java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:203)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:172)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147)
>   at 
> org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:55)
>   at 
> org.apache.flink.table.store.format.avro.AvroFileFormat$AvroGenericRecordBulkFormat.(AvroFileFormat.java:95)
>   at 
> org.apache.flink.table.store.format.avro.AvroFileFormat.createReaderFactory(AvroFileFormat.java:80)
>   at 
> org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:71)
>   at 
> org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:67)
>   at 
> org.apache.flink.table.store.file.manifest.ManifestList$Factory.create(ManifestList.java:130)
>   at 
> org.apache.flink.table.store.file.operation.AbstractFileStoreScan.(AbstractFileStoreScan.java:95)
>   at 
> org.apache.flink.table.store.file.operation.KeyValueFileStoreScan.(KeyValueFileStoreScan.java:57)
>   at 
> org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:118)
>   at 
> org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:71)
>   at 
> org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:38)
>   at 
> org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:116)
>   at 
> org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:43)
>   at 
> org.apache.flink.table.store.table.AbstractFileStoreTable.newCommit(AbstractFileStoreTable.java:121)
>   at 
> org.apache.flink.table.store.connector.sink.FileStoreSink.lambda$createCommitterFactory$63124b4e$1(FileStoreSink.java:69)
>   at 
> org.apache.flink.table.store.connector.sink.CommitterOperator.initializeState(CommitterOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:750)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #555: [FLINK-31204] Fix HiveCatalogITCase fails in IDEA

2023-02-23 Thread via GitHub


JingsongLi commented on code in PR #555:
URL: https://github.com/apache/flink-table-store/pull/555#discussion_r111644


##
flink-table-store-hive/flink-table-store-hive-catalog/pom.xml:
##
@@ -535,6 +535,65 @@ under the License.
 org.jamon
 jamon-runtime
 
+
+org.apache.hive
+hive-exec
+
+
+
+
+
+
+org.apache.avro
+avro
+${avro.version}
+test
+
+
+
+org.apache.hive
+hive-exec
+${hive.version}
+provided

Review Comment:
   test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2023-02-23 Thread Ran Tao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692979#comment-17692979
 ] 

Ran Tao commented on FLINK-12450:
-

[~jark] Hi, Jark. can you assign this ticket to me? i'm glad to support it.

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Priority: Major
>  Labels: auto-unassigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-12449) [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and SQL

2023-02-23 Thread Ran Tao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692978#comment-17692978
 ] 

Ran Tao commented on FLINK-12449:
-

[~lzljs3620320] hi,Jingsong, can you assign this ticket to me? i'm glad to 
support it.

> [Bitwise Functions] Add BIT_AND,  BIT_OR functions supported in Table API and 
> SQL
> -
>
> Key: FLINK-12449
> URL: https://issues.apache.org/jira/browse/FLINK-12449
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Priority: Major
>  Labels: auto-unassigned
>
> Bitwise AND.
> eg. SELECT BIT_AND(29,15), returns 13 
> Bitwise OR
> eg. SELECT BIT_OR(29 ,15), returns 31



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #553: Unify the version of `kafka` in pom.

2023-02-23 Thread via GitHub


JingsongLi merged PR #553:
URL: https://github.com/apache/flink-table-store/pull/553


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-23 Thread via GitHub


flinkbot commented on PR #22009:
URL: https://github.com/apache/flink/pull/22009#issuecomment-1442699380

   
   ## CI report:
   
   * c7e58124ab18ef98248a279359ee9014071847dc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

2023-02-23 Thread via GitHub


lsyldliu commented on PR #21966:
URL: https://github.com/apache/flink/pull/21966#issuecomment-1442697375

   @wuchong Thanks for reviewing. Regarding your question, we can't verify the 
configuration in ParquetReader now because the `ParquetFileReader` doesn't 
contain the credential info. The credential info is only used when creating 
hadoop FileSystem,   I have checked the related code to create FileSystem, and 
it loads hadoop conf from flink-conf.yaml first, then create the corresponding 
FileSystem, refer to 
https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java#L126
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31109) Fails with proxy user not supported even when security.kerberos.fetch.delegation-token is set to false

2023-02-23 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692973#comment-17692973
 ] 

Venkata krishnan Sowrirajan commented on FLINK-31109:
-

[~martijnvisser] Sure, I have posted a PR with the fix after discussing with 
[~gaborgsomogyi] . Please take a look whenever you get a chance. Thanks.

> Fails with proxy user not supported even when 
> security.kerberos.fetch.delegation-token is set to false
> --
>
> Key: FLINK-31109
> URL: https://issues.apache.org/jira/browse/FLINK-31109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Venkata krishnan Sowrirajan
>Assignee: Venkata krishnan Sowrirajan
>Priority: Blocker
>  Labels: pull-request-available
>
> With
> {code:java}
> security.kerberos.fetch.delegation-token: false
> {code}
> and delegation tokens obtained through our internal service which sets both 
> HADOOP_TOKEN_FILE_LOCATION to pick up the DTs and also sets the 
> HADOOP_PROXY_USER which fails with the below error
> {code:java}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/export/home/vsowrira/flink-1.18-SNAPSHOT/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/export/apps/hadoop/hadoop-bin_2100503/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
> org.apache.flink.runtime.security.modules.SecurityModule$SecurityInstallException:
>  Unable to set the Hadoop login user
>   at 
> org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:106)
>   at 
> org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:76)
>   at 
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:57)
>   at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1188)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.UnsupportedOperationException: Proxy user is not 
> supported
>   at 
> org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.throwProxyUserNotSupported(KerberosLoginProvider.java:137)
>   at 
> org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.isLoginPossible(KerberosLoginProvider.java:81)
>   at 
> org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:73)
>   ... 4 more
> {code}
> This seems to have gotten changed after 
> [480e6edf|https://github.com/apache/flink/commit/480e6edf9732f8334ef7576080fdbfc98051cb28]
>  ([FLINK-28330][runtime][security] Remove old delegation token framework code)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] venkata91 commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-23 Thread via GitHub


venkata91 commented on PR #22009:
URL: https://github.com/apache/flink/pull/22009#issuecomment-1442696039

   Please review. cc @MartijnVisser @gaborgsomogyi @becketqin


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31109) Fails with proxy user not supported even when security.kerberos.fetch.delegation-token is set to false

2023-02-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31109:
---
Labels: pull-request-available  (was: )

> Fails with proxy user not supported even when 
> security.kerberos.fetch.delegation-token is set to false
> --
>
> Key: FLINK-31109
> URL: https://issues.apache.org/jira/browse/FLINK-31109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Venkata krishnan Sowrirajan
>Assignee: Venkata krishnan Sowrirajan
>Priority: Blocker
>  Labels: pull-request-available
>
> With
> {code:java}
> security.kerberos.fetch.delegation-token: false
> {code}
> and delegation tokens obtained through our internal service which sets both 
> HADOOP_TOKEN_FILE_LOCATION to pick up the DTs and also sets the 
> HADOOP_PROXY_USER which fails with the below error
> {code:java}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/export/home/vsowrira/flink-1.18-SNAPSHOT/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/export/apps/hadoop/hadoop-bin_2100503/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
> org.apache.flink.runtime.security.modules.SecurityModule$SecurityInstallException:
>  Unable to set the Hadoop login user
>   at 
> org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:106)
>   at 
> org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:76)
>   at 
> org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:57)
>   at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1188)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.UnsupportedOperationException: Proxy user is not 
> supported
>   at 
> org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.throwProxyUserNotSupported(KerberosLoginProvider.java:137)
>   at 
> org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.isLoginPossible(KerberosLoginProvider.java:81)
>   at 
> org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:73)
>   ... 4 more
> {code}
> This seems to have gotten changed after 
> [480e6edf|https://github.com/apache/flink/commit/480e6edf9732f8334ef7576080fdbfc98051cb28]
>  ([FLINK-28330][runtime][security] Remove old delegation token framework code)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] venkata91 opened a new pull request, #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…

2023-02-23 Thread via GitHub


venkata91 opened a new pull request, #22009:
URL: https://github.com/apache/flink/pull/22009

   …etch is disabled
   
   
   
   ## What is the purpose of the change
   
   FLINK-28330 removed old delegation token framework code as part of it 
removed the existing support for delegation tokens that are managed outside of 
Flink
   
   ## Brief change log
   
   - As part of `YarnClusterDescriptor#setTokensFor` load the available 
delegation tokens in the client machine and then load the tokens obtained 
through `DelegationTokenManager` if `security.delegation.tokens.enabled` is set 
to true.
   - `HadoopModule` should throw exception if the 
`UserGroupInformation.currentUser` is a hadoop proxy user and also 
`security.delegation.tokens.enabled` is set to true.
   
   ## Verifying this change
   
   - Added tests in the `HadoopModuleTest` and in `KerberosLoginProviderITCase`.
   - Manually tested in our env where delegation token fetch is managed.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31205) do optimize for multi sink in a single relNode tree

2023-02-23 Thread Aitozi (Jira)
Aitozi created FLINK-31205:
--

 Summary: do optimize for multi sink in a single relNode tree 
 Key: FLINK-31205
 URL: https://issues.apache.org/jira/browse/FLINK-31205
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


Flink supports multi sink usage, but it optimize the each sink in a individual 
RelNode tree, this will miss some opportunity to do some cross tree 
optimization, eg: 


{code:java}
create table newX(
  a int,
  b bigint,
  c varchar,
  d varchar,
  e varchar
) with (
  'connector' = 'values'
  ,'enable-projection-push-down' = 'true'


insert into sink_table select a, b from newX
insert into sink_table select a, 1 from newX
{code}

It will produce the plan as below, this will cause the source be consumed twice


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, 
b], metadata=[]]], fields=[a, b])

Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Calc(select=[a, 1 AS b])
   +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a], metadata=[]]], fields=[a])

{code}

In this ticket, I propose to do a global optimization for the multi sink by 
* Megre the multi sink(with same table) into a single relNode tree with an 
extra union node
* After optimization, split the merged union back to the original multi sink

In my poc, after step 1, it will produce the plan as below, I think it will do 
good for the global performacne


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Union(all=[true], union=[a, b])
   :- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
   +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
  +- Reused(reference_id=[1])
{code}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zlzhang0122 commented on pull request #20183: [FLINK-24306][table]Group by index throw SqlValidatorException

2023-02-23 Thread via GitHub


zlzhang0122 commented on PR #20183:
URL: https://github.com/apache/flink/pull/20183#issuecomment-1442682491

   @wuchong Could you pls help to confirm and review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31201) Provides option to sort partition for full stage in streaming read

2023-02-23 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-31201:


Assignee: Jingsong Lee

> Provides option to sort partition for full stage in streaming read
> --
>
> Key: FLINK-31201
> URL: https://issues.apache.org/jira/browse/FLINK-31201
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> The overall order may be out of order due to the writing of the old 
> partition. We can provide an option to sort the full reading stage by 
> partition fields to avoid the disorder.
> (Actually, Currently, it is out of order for partitions. Because HashMap is 
> used, we may be able to sort according to the creation time of the first 
> file?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31196) Flink on YARN honors env.java.home

2023-02-23 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692966#comment-17692966
 ] 

zlzhang0122 commented on FLINK-31196:
-

Maybe duplicate with this? 
[Flink-22091|https://issues.apache.org/jira/browse/FLINK-22091]

> Flink on YARN honors env.java.home
> --
>
> Key: FLINK-31196
> URL: https://issues.apache.org/jira/browse/FLINK-31196
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.16.1
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger 
> containers with a configured env.java.home.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31201) Provides option to sort partition for full stage in streaming read

2023-02-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31201:
---
Labels: pull-request-available  (was: )

> Provides option to sort partition for full stage in streaming read
> --
>
> Key: FLINK-31201
> URL: https://issues.apache.org/jira/browse/FLINK-31201
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> The overall order may be out of order due to the writing of the old 
> partition. We can provide an option to sort the full reading stage by 
> partition fields to avoid the disorder.
> (Actually, Currently, it is out of order for partitions. Because HashMap is 
> used, we may be able to sort according to the creation time of the first 
> file?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi opened a new pull request, #554: [FLINK-31201] Scan to generate splits should care about file order

2023-02-23 Thread via GitHub


JingsongLi opened a new pull request, #554:
URL: https://github.com/apache/flink-table-store/pull/554

   Currently, it is out of order for partitions. Because HashMap is used, we 
may be able to use LinkedHashMap to make the order according to the creation 
time of the first file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31204) HiveCatalogITCase fails due to avro conflict in table store

2023-02-23 Thread Shammon (Jira)
Shammon created FLINK-31204:
---

 Summary: HiveCatalogITCase fails due to avro conflict in table 
store
 Key: FLINK-31204
 URL: https://issues.apache.org/jira/browse/FLINK-31204
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Shammon


Test fails in IDEA

at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 4 more
Caused by: java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z
at 
org.apache.flink.table.store.format.avro.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:203)
at 
org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:172)
at 
org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147)
at 
org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147)
at 
org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:55)
at 
org.apache.flink.table.store.format.avro.AvroFileFormat$AvroGenericRecordBulkFormat.(AvroFileFormat.java:95)
at 
org.apache.flink.table.store.format.avro.AvroFileFormat.createReaderFactory(AvroFileFormat.java:80)
at 
org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:71)
at 
org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:67)
at 
org.apache.flink.table.store.file.manifest.ManifestList$Factory.create(ManifestList.java:130)
at 
org.apache.flink.table.store.file.operation.AbstractFileStoreScan.(AbstractFileStoreScan.java:95)
at 
org.apache.flink.table.store.file.operation.KeyValueFileStoreScan.(KeyValueFileStoreScan.java:57)
at 
org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:118)
at 
org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:71)
at 
org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:38)
at 
org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:116)
at 
org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:43)
at 
org.apache.flink.table.store.table.AbstractFileStoreTable.newCommit(AbstractFileStoreTable.java:121)
at 
org.apache.flink.table.store.connector.sink.FileStoreSink.lambda$createCommitterFactory$63124b4e$1(FileStoreSink.java:69)
at 
org.apache.flink.table.store.connector.sink.CommitterOperator.initializeState(CommitterOperator.java:104)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] rkhachatryan commented on pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on PR #21981:
URL: https://github.com/apache/flink/pull/21981#issuecomment-1442587044

   Thanks for the feedback, @zentol and @dmvk .
   I've updated the PR, would you mind taking another look?
   
   I've significantly restructured the code after the offline discussions.
   Probably a good place to start are the final versions of `SlotAllocator` and 
`SlotAssigner` interfaces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116364006


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java:
##
@@ -54,6 +54,18 @@ public interface SlotAllocator {
 Optional determineParallelism(
 JobInformation jobInformation, Collection 
slots);
 
+/**
+ * Same as {@link #determineParallelism(JobInformation, Collection)} but 
additionally determine
+ * assignment of slots to execution slot sharing groups.
+ */
+default Optional

Review Comment:
   After an offline discussion, I restructured the interfaces 
(4fa93e3756fad77cf2dbc92bf77614aa39cc28fe..2fc84d83884c1f342d46fec02d961d4f73eb4219).
   
   This particular comment is addressed by 
4fa93e3756fad77cf2dbc92bf77614aa39cc28fe:
   ```
   Restructure 1/5: change types passed between AdaptiveScheduler and 
SlotAssigner
   
   Slot assignments are computed and consumed by SlotAllocator.
   This is expressed implicitly by implementing VertexParallelism.
   
   This change tries to make that clear, while still allowing to assign
   slots to something other than slot sharing groups.
   
   It does so by:
   1. Introduce JobSchedulingPlan, computed and consumed by SlotAllocator. It 
couples VertexParallelism with slot assignments.
   2. Push the polymorphism of state assignments from VertexParallelism to slot 
assignment target in JobSchedulingPlan.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116362299


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116362006


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116360889


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));

Review Comment:
   Fixed by addressing the [comment 
above](https://github.com/apache/flink/pull/21981#discussion_r1112862609).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116359613


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116359318


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116358878


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java:
##
@@ -163,7 +165,12 @@ CompletableFuture goToStopWithSavepoint(
 /** Interface covering transition to the {@link WaitingForResources} 
state. */
 interface ToWaitingForResources extends StateTransitions {
 
-/** Transitions into the {@link WaitingForResources} state. */
+/**
+ * Transitions into the {@link WaitingForResources} state without 
{@link ExecutionGraph}
+ * (e.g. after creation).
+ */
 void goToWaitingForResources();
+/** Transitions into the {@link WaitingForResources} state (e.g. after 
restarting). */
+void goToWaitingForResources(ExecutionGraph executionGraph);

Review Comment:
   Removed in 3760a182422db9a7b7cabf420f170346617455fa.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116358657


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116358477


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.StreamSupport;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+private static class AllocationScore implements 
Comparable {
+
+private final String group;
+private final AllocationID allocationId;
+
+public AllocationScore(String group, AllocationID allocationId, int 
score) {
+this.group = group;
+this.allocationId = allocationId;
+this.score = score;
+}
+
+private final int score;
+
+public String getGroup() {
+return group;
+}
+
+public AllocationID getAllocationId() {
+return allocationId;
+}
+
+public int getScore() {
+return score;
+}
+
+@Override
+public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+int result = Integer.compare(score, other.score);
+if (result != 0) {
+return result;
+}
+result = other.allocationId.compareTo(allocationId);
+if (result != 0) {
+return result;
+}
+return other.group.compareTo(group);
+}
+}
+
+private final Map> locality;
+private final Map maxParallelism;
+
+public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) {
+this(
+calculateLocalKeyGroups(archivedExecutionGraph),
+StreamSupport.stream(
+
archivedExecutionGraph.getVerticesTopologically().spliterator(),
+false)
+.collect(
+toMap(
+ExecutionJobVertex::getJobVertexId,
+
ExecutionJobVertex::getMaxParallelism)));
+}
+
+public StateLocalitySlotAssigner(
+Map> locality,
+Map maxParallelism) {
+this.locality = locality;
+this.maxParallelism = maxParallelism;
+}
+
+@Override
+public AssignmentResult assignSlots(
+Collection slots, 
Collection groups) {
+
+final Map parallelism = new HashMap<>();
+groups.forEach(
+group ->
+group.getContainedExecutionVertices()
+.forEach(
+evi ->
+parallelism.merge(
+

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116356436


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##
@@ -794,7 +802,23 @@ public void goToWaitingForResources() {
 LOG,
 desiredResources,
 this.initialResourceAllocationTimeout,
-this.resourceStabilizationTimeout));
+this.resourceStabilizationTimeout,
+null));
+}
+
+@Override
+public void goToWaitingForResources(ExecutionGraph executionGraph) {

Review Comment:
   I agree, `state.as` seems error-prone to me. Initial implementation had an 
error related to that (state was never the one expected).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116355057


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -121,16 +133,13 @@ public Optional 
determineParallelism(
 slotSharingGroupParallelism.get(
 slotSharingGroup.getSlotSharingGroupId()));
 
-final Iterable 
sharedSlotToVertexAssignment =
+final List sharedSlotToVertexAssignment 
=
 createExecutionSlotSharingGroups(vertexParallelism);
 
-for (ExecutionSlotSharingGroup executionSlotSharingGroup :
-sharedSlotToVertexAssignment) {
-final SlotInfo slotInfo = slotIterator.next();
-
-assignments.add(
-new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
-}
+SlotAssigner.AssignmentResult result =
+slotAssigner.assignSlots(freeSlots, 
sharedSlotToVertexAssignment);
+assignments.addAll(result.assignments);
+freeSlots = result.remainingSlots;

Review Comment:
   Fixed by restructuring the code and going through all the groups in on go: 
2fc84d83884c1f342d46fec02d961d4f73eb4219 .. 
4fa93e3756fad77cf2dbc92bf77614aa39cc28fe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler

2023-02-23 Thread via GitHub


rkhachatryan commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116355057


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -121,16 +133,13 @@ public Optional 
determineParallelism(
 slotSharingGroupParallelism.get(
 slotSharingGroup.getSlotSharingGroupId()));
 
-final Iterable 
sharedSlotToVertexAssignment =
+final List sharedSlotToVertexAssignment 
=
 createExecutionSlotSharingGroups(vertexParallelism);
 
-for (ExecutionSlotSharingGroup executionSlotSharingGroup :
-sharedSlotToVertexAssignment) {
-final SlotInfo slotInfo = slotIterator.next();
-
-assignments.add(
-new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
-}
+SlotAssigner.AssignmentResult result =
+slotAssigner.assignSlots(freeSlots, 
sharedSlotToVertexAssignment);
+assignments.addAll(result.assignments);
+freeSlots = result.remainingSlots;

Review Comment:
   Fixed by restructuring the code and going through all the groups in on go: 
2fc84d83884c1f342d46fec02d961d4f73eb4219
   ..4fa93e3756fad77cf2dbc92bf77614aa39cc28fe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gunnarmorling commented on pull request #21648: [FLINK-30636] [docs]: Typo fix; 'to to' -> 'to'

2023-02-23 Thread via GitHub


gunnarmorling commented on PR #21648:
URL: https://github.com/apache/flink/pull/21648#issuecomment-1442297939

   @rmetzger, could we get this one merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm merged pull request #537: [docs] Use Prometheus factory class in metric reporter examples

2023-02-23 Thread via GitHub


mxm merged PR #537:
URL: https://github.com/apache/flink-kubernetes-operator/pull/537


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] dannycranmer commented on a diff in pull request #611: Flink 1.15.4

2023-02-23 Thread via GitHub


dannycranmer commented on code in PR #611:
URL: https://github.com/apache/flink-web/pull/611#discussion_r1116091205


##
docs/content/posts/2023-02-23-release-1.15.4.md:
##
@@ -0,0 +1,152 @@
+---
+authors:
+- fapaul: null
+  name: Danny Cranmer
+date: "2023-02-23T17:00:00Z"
+excerpt: The Apache Flink Community is pleased to announce a bug fix release 
for Flink
+  1.15.
+title: Apache Flink 1.15.4 Release Announcement
+aliases:
+- /news/2023/02/23/release-1.15.4.html
+---
+
+The Apache Flink Community is pleased to announce the third bug fix release of 
the Flink 1.15 series.

Review Comment:
   Thanks, i missed that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] snuyanzin commented on a diff in pull request #611: Flink 1.15.4

2023-02-23 Thread via GitHub


snuyanzin commented on code in PR #611:
URL: https://github.com/apache/flink-web/pull/611#discussion_r1116071377


##
docs/content/posts/2023-02-23-release-1.15.4.md:
##
@@ -0,0 +1,152 @@
+---
+authors:
+- fapaul: null
+  name: Danny Cranmer
+date: "2023-02-23T17:00:00Z"
+excerpt: The Apache Flink Community is pleased to announce a bug fix release 
for Flink
+  1.15.
+title: Apache Flink 1.15.4 Release Announcement
+aliases:
+- /news/2023/02/23/release-1.15.4.html
+---
+
+The Apache Flink Community is pleased to announce the third bug fix release of 
the Flink 1.15 series.

Review Comment:
   ```suggestion
   The Apache Flink Community is pleased to announce the fourth bug fix release 
of the Flink 1.15 series.
   ```
   According to 
https://github.com/apache/flink-web/blob/147d1c0d5823006d360d410d771cf55c18d02e53/docs/content/posts/2022-11-10-release-1.15.3.md?plain=1#L13
   1.15.3 was the third, or did I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-02-23 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1116007389


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -31,24 +31,28 @@
 
 import java.util.Optional;
 
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
-
 /** Specific {@link ArgumentTypeStrategy} for {@link 
BuiltInFunctionDefinitions#ARRAY_CONTAINS}. */
 @Internal
 class ArrayElementArgumentTypeStrategy implements ArgumentTypeStrategy {
 
+private final boolean preserveNullability;
+
+public ArrayElementArgumentTypeStrategy(boolean preserveNullability) {
+this.preserveNullability = preserveNullability;
+}
+
 @Override
 public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
 final LogicalType haystackElementType = haystackType.getElementType();
-final LogicalType needleType =
-
callContext.getArgumentDataTypes().get(argumentPos).getLogicalType();
-if (supportsImplicitCast(needleType, haystackElementType)) {

Review Comment:
   I didn't get: check for implicit cast should be removed? why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27954) JobVertexFlameGraphHandler does not work on standby Dispatcher

2023-02-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-27954:
--

Assignee: Weijie Guo

> JobVertexFlameGraphHandler does not work on standby Dispatcher
> --
>
> Key: FLINK-27954
> URL: https://issues.apache.org/jira/browse/FLINK-27954
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / REST
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.17.0, 1.16.2, 1.15.5
>
>
> The {{JobVertexFlameGraphHandler}} relies internally on the 
> {{JobVertexThreadInfoTracker}} which calls 
> {{ResourceManagerGateway#requestTaskExecutorThreadInfoGateway}} to get a 
> gateway for requesting the thread info from the task executors. Since this 
> gateway is not serializable it would categorically fail if called from a 
> standby dispatcher.
> Instead this should follow the logic of the {{MetricFetcherImpl}}, which 
> requests addresses instead and manually connects to the task executors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] mxm opened a new pull request, #537: [docs] Use Prometheus factory class in metric reporter examples

2023-02-23 Thread via GitHub


mxm opened a new pull request, #537:
URL: https://github.com/apache/flink-kubernetes-operator/pull/537

   Using the reporter class directly is not supported anymore with Flink 1.16.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27954) JobVertexFlameGraphHandler does not work on standby Dispatcher

2023-02-23 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692801#comment-17692801
 ] 

Weijie Guo commented on FLINK-27954:


Thanks [~chesnay] for reporting this and giving fix suggestion, I'd like to do 
this.

> JobVertexFlameGraphHandler does not work on standby Dispatcher
> --
>
> Key: FLINK-27954
> URL: https://issues.apache.org/jira/browse/FLINK-27954
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / REST
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.17.0, 1.16.2, 1.15.5
>
>
> The {{JobVertexFlameGraphHandler}} relies internally on the 
> {{JobVertexThreadInfoTracker}} which calls 
> {{ResourceManagerGateway#requestTaskExecutorThreadInfoGateway}} to get a 
> gateway for requesting the thread info from the task executors. Since this 
> gateway is not serializable it would categorically fail if called from a 
> standby dispatcher.
> Instead this should follow the logic of the {{MetricFetcherImpl}}, which 
> requests addresses instead and manually connects to the task executors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-02-23 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1116011807


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java:
##
@@ -71,7 +71,11 @@ public final class SpecificInputTypeStrategies {
 
 /** Argument type derived from the array element type. */
 public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG =
-new ArrayElementArgumentTypeStrategy();
+new ArrayElementArgumentTypeStrategy(false);
+
+/** Argument type derived from the array element type. But leaves 
nullability untouched. */

Review Comment:
   that seems not true.
   Example: array[1, 2, 3] which has nullability `NOT NULL` and 
`ARRAY_ELEMENT_ARG_NULLABLE` makes it nullable. So it looks like it doesn't 
"untouched" nullability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-02-23 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1116007389


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -31,24 +31,28 @@
 
 import java.util.Optional;
 
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
-
 /** Specific {@link ArgumentTypeStrategy} for {@link 
BuiltInFunctionDefinitions#ARRAY_CONTAINS}. */
 @Internal
 class ArrayElementArgumentTypeStrategy implements ArgumentTypeStrategy {
 
+private final boolean preserveNullability;
+
+public ArrayElementArgumentTypeStrategy(boolean preserveNullability) {
+this.preserveNullability = preserveNullability;
+}
+
 @Override
 public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
 final LogicalType haystackElementType = haystackType.getElementType();
-final LogicalType needleType =
-
callContext.getArgumentDataTypes().get(argumentPos).getLogicalType();
-if (supportsImplicitCast(needleType, haystackElementType)) {

Review Comment:
   I didn't get: check for implicit cast should be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-31183) Flink Kinesis EFO Consumer can fail to stop gracefully

2023-02-23 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-31183.
---
Resolution: Fixed

> Flink Kinesis EFO Consumer can fail to stop gracefully
> --
>
> Key: FLINK-31183
> URL: https://issues.apache.org/jira/browse/FLINK-31183
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.3, 1.16.1, aws-connector-4.0.0
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.4, aws-connector-4.1.0, 1.16.2
>
>
> *Background*
> When stopping a Flink job using the stop-with-savepoint API the EFO Kinesis 
> source can fail to close gracefully.
>  
> Sample stack trace
> {code:java}
> 2023-02-16 20:45:40
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
>   at 
> org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name 
> with subtask : Source: vas_source_stream (38/48)#0 Failure reason: Task has 
> failed.
>   at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395)
>   at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> Caused by: java.util.concurrent.CompletionException: 
> java.util.concurrent.RejectedExecutionException: event executor terminated
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
>   ... 3 more
> Caused by: java.util.concurrent.RejectedExecutionException: event executor 
> terminated
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
>   at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher$ChannelSubscription.cancel(HandlerPublisher.java:502)
>   at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.cancel(DelegatingSubscription.java:37)
>   at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription.cancel(Http2ResetSendingSubscription.java:41)
>   at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.cancel(DelegatingSubscription.java:37)
>   at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$OnCancelSubscription.cancel(ResponseHandler.java:409)
>   at 
> 

[jira] [Comment Edited] (FLINK-31183) Flink Kinesis EFO Consumer can fail to stop gracefully

2023-02-23 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692651#comment-17692651
 ] 

Danny Cranmer edited comment on FLINK-31183 at 2/23/23 5:05 PM:


Merged commit 
[{{a8c34db}}|https://github.com/apache/flink/commit/a8c34db3d601c534f92e68a2709a6467eb94276e]
 into apache:release-1.15 

Merged commit 
[{{cd7b049}}|https://github.com/apache/flink/commit/cd7b0495bcdadc3a9808a475be819c9808d5f17e]
 into apache:release-1.16 

Merged commit 
[{{fdfe982}}|https://github.com/apache/flink-connector-aws/commit/fdfe9821b36027e9afd8db4d32ac8eff080dad2d]
 into apache:main 


was (Author: dannycranmer):
Merged commit 
[{{a8c34db}}|https://github.com/apache/flink/commit/a8c34db3d601c534f92e68a2709a6467eb94276e]
 into apache:release-1.15 

Merged commit 
[{{fdfe982}}|https://github.com/apache/flink-connector-aws/commit/fdfe9821b36027e9afd8db4d32ac8eff080dad2d]
 into apache:main 

> Flink Kinesis EFO Consumer can fail to stop gracefully
> --
>
> Key: FLINK-31183
> URL: https://issues.apache.org/jira/browse/FLINK-31183
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.3, 1.16.1, aws-connector-4.0.0
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.4, aws-connector-4.1.0, 1.16.2
>
>
> *Background*
> When stopping a Flink job using the stop-with-savepoint API the EFO Kinesis 
> source can fail to close gracefully.
>  
> Sample stack trace
> {code:java}
> 2023-02-16 20:45:40
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
>   at 
> org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>   at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name 
> with subtask : Source: vas_source_stream (38/48)#0 Failure reason: Task has 
> failed.
>   at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395)
>   at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> Caused by: java.util.concurrent.CompletionException: 
> java.util.concurrent.RejectedExecutionException: event executor terminated
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
>   ... 3 more
> Caused by: java.util.concurrent.RejectedExecutionException: event executor 
> terminated
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
>   at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
>   at 
> 

[GitHub] [flink] dannycranmer merged pull request #22008: [FLINK-31183][Connector/Kinesis] Fix bug where EFO Consumer can fail …

2023-02-23 Thread via GitHub


dannycranmer merged PR #22008:
URL: https://github.com/apache/flink/pull/22008


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-02-23 Thread via GitHub


zhuzhurk commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1115923261


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -928,14 +1011,8 @@ private StreamConfig createJobVertex(Integer 
streamNodeId, OperatorChainInfo cha
 return new StreamConfig(jobVertex.getConfiguration());
 }
 
-private void setVertexConfig(
-Integer vertexID,
-StreamConfig config,
-List chainableOutputs,
-List nonChainableOutputs,
-Map chainedSources) {
-
-tryConvertPartitionerForDynamicGraph(chainableOutputs, 
nonChainableOutputs);
+private void setOperatorConfig(
+Integer vertexID, StreamConfig config, Map chainedSources) {

Review Comment:
   vertexID -> vertexId



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroup.java:
##
@@ -53,9 +55,22 @@ public ForwardGroup(final Set jobVertices) {
 .map(JobVertex::getParallelism)
 .collect(Collectors.toSet());
 
-checkState(decidedParallelisms.size() <= 1);
-if (decidedParallelisms.size() == 1) {
-this.parallelism = decidedParallelisms.iterator().next();
+checkState(configuredParallelisms.size() <= 1);
+if (configuredParallelisms.size() == 1) {
+this.parallelism = configuredParallelisms.iterator().next();
+}
+
+Set configuredMaxParallelisms =
+jobVertices.stream()
+.map(JobVertex::getMaxParallelism)
+.filter(val -> val > 0)
+.collect(Collectors.toSet());
+
+if (!configuredMaxParallelisms.isEmpty()) {
+this.maxParallelism = Collections.min(configuredMaxParallelisms);
+checkState(
+maxParallelism >= parallelism,

Review Comment:
   The `parallelism` can be -1, while the `maxParallelism` is configured.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -758,6 +771,76 @@ private List createChain(
 }
 }
 
+private void setVertexParallelismsForDynamicGraphIfNecessary() {

Review Comment:
   Better to add some comments to explain what this method is for, e.g. ensures 
the parallelism and maxParallelism of vertices in the same forward group to be 
the same; set the parallelism at early stage if possible, to avoid invalid 
partition reuse.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -1955,6 +2091,14 @@ private OperatorID addNodeToChain(int currentNodeId, 
String operatorName) {
 return new OperatorID(primaryHashBytes);
 }
 
+private void setTransitiveOutEdges(final List 
transitiveOutEdges) {
+this.transitiveOutEdges.addAll(transitiveOutEdges);
+}
+
+public List getTransitiveOutEdges() {

Review Comment:
   ```suggestion
   private List getTransitiveOutEdges() {
   ```



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -568,12 +592,7 @@ && isChainableInput(sourceOutEdge, streamGraph)) {
 final StreamConfig.SourceInputConfig inputConfig =
 new StreamConfig.SourceInputConfig(sourceOutEdge);
 final StreamConfig operatorConfig = new StreamConfig(new 
Configuration());
-setVertexConfig(
-sourceNodeId,
-operatorConfig,
-Collections.emptyList(),
-Collections.emptyList(),
-Collections.emptyMap());
+setOperatorConfig(sourceNodeId, operatorConfig, 
Collections.emptyMap());

Review Comment:
   IIUC, `setChainedOutputs(emptyList)` would be called here previously, but is 
no longer called after this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >