[GitHub] [flink] JunRuiLee commented on pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
JunRuiLee commented on PR #21963: URL: https://github.com/apache/flink/pull/21963#issuecomment-1443025904 LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
gaborgsomogyi commented on PR #22009: URL: https://github.com/apache/flink/pull/22009#issuecomment-1443023755 e2e tests are still running so most probably it's going to pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
gaborgsomogyi commented on code in PR #22009: URL: https://github.com/apache/flink/pull/22009#discussion_r1116617650 ## flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java: ## @@ -70,10 +71,22 @@ public void install() throws SecurityInstallException { try { KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig); -if (kerberosLoginProvider.isLoginPossible()) { -kerberosLoginProvider.doLogin(); +if (kerberosLoginProvider.isLoginPossible(true)) { +kerberosLoginProvider.doLogin(true); loginUser = UserGroupInformation.getLoginUser(); +if (HadoopUserUtils.isProxyUser((loginUser)) Review Comment: I think it's good to add this in general not to change the original behavior. I'm just writing down a possible future consideration so no change needed for now. This pointed me to an edge case where a user wants to use proxy user support w/ non-Hadoop delegation tokens like S3. The original implementation is not allowing to do this but maybe there is a need for it. I need some more time to consider this in-depth. Later on we might remove this check when the use-cases expecting that. ## flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java: ## @@ -57,7 +58,7 @@ public void isLoginPossibleMustReturnFalseByDefault() throws IOException { UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); -assertFalse(kerberosLoginProvider.isLoginPossible()); +assertFalse(kerberosLoginProvider.isLoginPossible(true)); Review Comment: Such original tests are testing the no proxy cases. I would either set the new boolean on all non-proxy cases to false or test both proxy and non-proxy cases (true + false too) to be 100% sure that the original behavior is covered. It's your choice. Since we're under time pressure I would vote on the first. ## flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java: ## @@ -99,6 +101,8 @@ public void doLogin() throws IOException { LOG.info("Attempting to load user's ticket cache"); UserGroupInformation.loginUserFromSubject(null); LOG.info("Loaded user's ticket cache successfully"); +} else if (supportProxyUser) { +LOG.info("Delegation token fetch is managed and therefore login is managed"); Review Comment: Maybe we can say something like: `Proxy user doesn't need login since it must have credentials already`. I think `KerberosLoginProvider` shouldn't know about tokens at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
flinkbot commented on PR #22010: URL: https://github.com/apache/flink/pull/22010#issuecomment-1443013003 ## CI report: * 5f4a9ee58a62b1b2e4d540ea6293554638722e53 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzhiwen1255 commented on pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
xuzhiwen1255 commented on PR #22010: URL: https://github.com/apache/flink/pull/22010#issuecomment-1442997250 @reswqa Can you review it for me, Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30583) Provide the flame graph to the subtask level
[ https://issues.apache.org/jira/browse/FLINK-30583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693055#comment-17693055 ] Piotr Nowojski commented on FLINK-30583: Thanks for this improvement! I've just found it in the WebUI and docs :) > Provide the flame graph to the subtask level > > > Key: FLINK-30583 > URL: https://issues.apache.org/jira/browse/FLINK-30583 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST, Runtime / Web Frontend >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Fix For: 1.17.0 > > > This is a umbrella Jira about providing the flame graph to the subtask level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80
[ https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-27115. -- Resolution: Fixed master(1.18) via 67bd4ba7f300749c4831b37df525b82a76d3c36c release-1.17 via 6170615ed0aa3495c374cce8d33b0ea272b4bccc > Error while executing BLOB connection. java.io.IOException: Unknown operation > 80 > > > Key: FLINK-27115 > URL: https://issues.apache.org/jira/browse/FLINK-27115 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: zhiyuan >Assignee: Weihua Hu >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: image-2023-02-23-11-14-16-479.png, > image-2023-02-23-11-30-39-406.png > > > hi, > I have a Flink SQL job running online. Every morning, I will report the > following screenshot error. I have enabled the debug log to check and find > nothing > version 1.12 > !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg&X-OWA-CANARY=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.&owa=outlook.live.com&scriptVer=20220401003.04&animation=true! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31192) dataGen takes too long to initialize under sequence
[ https://issues.apache.org/jira/browse/FLINK-31192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31192: --- Labels: pull-request-available (was: ) > dataGen takes too long to initialize under sequence > --- > > Key: FLINK-31192 > URL: https://issues.apache.org/jira/browse/FLINK-31192 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: xzw0223 >Assignee: xzw0223 >Priority: Major > Labels: pull-request-available > > The SequenceGenerator preloads all sequence values in open. If the > totalElement number is too large, it will take too long. > [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java#L91] > The reason is that the capacity of the Deque will be expanded twice when the > current capacity is full, and the array copy is required, which is > time-consuming. > > Here's what I think : > do not preload the full amount of data on Sequence, and generate a piece of > data each time next is called to solve the problem of slow initialization > caused by loading full amount of data. > record the currently sent Sequence position through the checkpoint, and > continue to send data through the recorded position after an abnormal restart > to ensure fault tolerance -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xuzhiwen1255 opened a new pull request, #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
xuzhiwen1255 opened a new pull request, #22010: URL: https://github.com/apache/flink/pull/22010 ## What is the purpose of the change In the sequence mode, the dataGen field will preload all the sequence values when it is initialized, and it will take a long time during this period ## Brief change log - Adapt the way of preloading the value to generate a sequence value every time the next method is called - The checkpoint saves not all the sequence values, but the position where the current task sends the sequence value ## Verifying this change `DataGeneratorSourceTest` and `DataGenTableSourceFactoryTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80
[ https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-27115: --- Affects Version/s: 1.17.0 > Error while executing BLOB connection. java.io.IOException: Unknown operation > 80 > > > Key: FLINK-27115 > URL: https://issues.apache.org/jira/browse/FLINK-27115 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: zhiyuan >Assignee: Weihua Hu >Priority: Major > Labels: pull-request-available > Attachments: image-2023-02-23-11-14-16-479.png, > image-2023-02-23-11-30-39-406.png > > > hi, > I have a Flink SQL job running online. Every morning, I will report the > following screenshot error. I have enabled the debug log to check and find > nothing > version 1.12 > !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg&X-OWA-CANARY=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.&owa=outlook.live.com&scriptVer=20220401003.04&animation=true! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80
[ https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-27115: --- Fix Version/s: 1.17.0 > Error while executing BLOB connection. java.io.IOException: Unknown operation > 80 > > > Key: FLINK-27115 > URL: https://issues.apache.org/jira/browse/FLINK-27115 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: zhiyuan >Assignee: Weihua Hu >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: image-2023-02-23-11-14-16-479.png, > image-2023-02-23-11-30-39-406.png > > > hi, > I have a Flink SQL job running online. Every morning, I will report the > following screenshot error. I have enabled the debug log to check and find > nothing > version 1.12 > !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg&X-OWA-CANARY=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.&owa=outlook.live.com&scriptVer=20220401003.04&animation=true! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-27115) Error while executing BLOB connection. java.io.IOException: Unknown operation 80
[ https://issues.apache.org/jira/browse/FLINK-27115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-27115: -- Assignee: Weihua Hu > Error while executing BLOB connection. java.io.IOException: Unknown operation > 80 > > > Key: FLINK-27115 > URL: https://issues.apache.org/jira/browse/FLINK-27115 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: zhiyuan >Assignee: Weihua Hu >Priority: Major > Labels: pull-request-available > Attachments: image-2023-02-23-11-14-16-479.png, > image-2023-02-23-11-30-39-406.png > > > hi, > I have a Flink SQL job running online. Every morning, I will report the > following screenshot error. I have enabled the debug log to check and find > nothing > version 1.12 > !https://attachment.outlook.live.net/owa/MSA%3Azhiyuan.franklin%40outlook.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATM0MDAAMS0zYzc1LTFmNGQtMDACLTAwCgBGAAAD39WjdeaLP0WTyi0RfpiKawcAZU0SuCA2lEmxEZ%2BxAo0sZwAAAgEJZU0SuCA2lEmxEZ%2BxAo0sZwAAAFJWGd8BEgAQAAXCfQrjuKVKrok%2FasxKyoI%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjFiNmM2MGU3YWFlYTQ5M2E4NzViZjY4NjQwNTI4MzA5IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjkxNDc5ODk4MzU4NTYxM1wiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDM0MDAxLTNjNzUtMWY0ZC0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctMjEyOTkzLTEwMTQzMDg2ODVcIn0iLCJuYmYiOjE2NDkzMjQ5ODIsImV4cCI6MTY0OTMyNTU4MiwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDg0ZGY5ZTdmLWU5ZjYtNDBhZi1iNDM1LWFhYWFhYWFhYWFhYSIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50Lm91dGxvb2subGl2ZS5uZXRAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiaGFwcCI6Im93YSJ9.cXk2gaCUX42hpan8Y6m_xMFXUuwap_WOItLw2Cnt2ZJUx01w1Lnlt-JICWKz18mNjUgsCliu1ds3o0AIs4etAwxHFjFwzIQ1Qza7Q_RABlPl9twLkW1ukQYppHvgCeSZQZ4r3-OfsOXeKVXexui19_tAW5iUKhOJihCkU3B84hvX5DGdkMo1daQP9-89fGT4JfoAxSmnNhTjkZ7THULYgz6sHumrOh82Nd_Qqz7VE5jv3tkZw-qCFgyUhHTX1kA51W_GBcgjIw85mJC7iBwnu-mh5ix2Vgawh9aNGcZTDS4FOvSdcU20Nci_BsPsCsaECdv1UCEJnYDSDD73ySWmSg&X-OWA-CANARY=N_cwUTbUw0-7mNK6xvPrRTBKApR8GNoYAgYgmvWw1rWGZX3VKlEy8VAzuIMf5IJJxzBTtG1-q4o.&owa=outlook.live.com&scriptVer=20220401003.04&animation=true! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa merged pull request #22004: [FLINK-27115][blob] Enrich the error log with remote address for blobServer
reswqa merged PR #22004: URL: https://github.com/apache/flink/pull/22004 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
reswqa commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116565034 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> { +testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); +throw new RuntimeException( +"Expected an exception but ran successfully"); +}) +.isInstanceOf(ExpectedTestException.class); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.isInstanceOf(FailingTwiceOperator.CloseException.class); Review Comment: We don't need to go back to the original implementation. You can see my last comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit
[ https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693022#comment-17693022 ] Yanfei Lei commented on FLINK-31089: Let me try to summarize this issue: # Enable PinL0FilterAndIndexBlocksInCache or PinTopLevelIndexAndFilter, disable TTL, will result in OOM ## PinTopLevelIndexAndFilter can significantly affect the performance. ## PinL0FilterAndIndexBlocksInCache will NOT affect the performance. # Enable PinL0FilterAndIndexBlocksInCache or PinTopLevelIndexAndFilter, enable TTL, the memory wouldn't keep growing. ## Due to https://issues.apache.org/jira/browse/FLINK-22957 , the TTL can't take effect for the Rank operator in Flink 1.13. Is the TTL set by "table.exec.state.ttl"? If the job is a DataStream job, maybe you can set TTL for the rank operator via StateTtlConfig. > pin L0 index in memory can lead to slow memory grow finally lead to memory > beyond limit > --- > > Key: FLINK-31089 > URL: https://issues.apache.org/jira/browse/FLINK-31089 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > Attachments: image-2023-02-15-20-26-58-604.png, > image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, > l0pin_open.png > > > with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned > memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if > we switch it to false, we can see the pinned memory stay realtive static. In > our environment, a lot of tasks restart due to memory over limit killed by k8s > !image-2023-02-15-20-26-58-604.png|width=899,height=447! > > !image-2023-02-15-20-32-17-993.png|width=853,height=464! > the two graphs are recorded in yesterday and today, which means the data > stream number per second will not differ alot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
reswqa commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116563643 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -368,17 +368,19 @@ void testCleanUpExceptionSuppressing() throws Exception { .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -assertThatThrownBy( -() -> { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException( -"Expected an exception but ran successfully"); -}) -.isInstanceOf(ExpectedTestException.class); +try { +testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); +throw new RuntimeException("Expected an exception but ran successfully"); +} catch (Exception ex) { +ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); +} Review Comment: ```suggestion Assertions.assertThatThrownBy( () -> testHarness.processElement( new StreamRecord<>("Doesn't matter", 0))) .satisfies( throwable -> ExceptionUtils.assertThrowable( throwable, ExpectedTestException.class)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jhkkhtd commented on pull request #21998: [Flink-clients][api] Adding a method to get ClusterClientProvider
jhkkhtd commented on PR #21998: URL: https://github.com/apache/flink/pull/21998#issuecomment-1442874077 > Thanks @jhkkhtd. Firstly, You should open a [jira ticket](https://issues.apache.org/jira/projects/FLINK/issues) and describe clearly why you need this change and how you plan to change it. If the community thinks that this is a reasonable demand, someone will assign this ticket to you, and then you can start open the PR. Thanks , I have never use jira, I have searched in the Internet ,but i can't find a help information to create an account . How should i get a jira account -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31208) KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits)
[ https://issues.apache.org/jira/browse/FLINK-31208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated FLINK-31208: -- Description: KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) ,which is no difference with its Parent class (SourceReaderBase). why not remove this override method? Relative code is here, which we can see is no difference? {code:java} //org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits @Override public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); } //org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits @Override public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); }{code} was: KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) ,because no difference with its Parent class (SourceReaderBase). why not remove this override method? Relative code is here, which we can see is no difference? {code:java} //org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits @Override public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); } //org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits @Override public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); }{code} > KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) > --- > > Key: FLINK-31208 > URL: https://issues.apache.org/jira/browse/FLINK-31208 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Hongshun Wang >Priority: Not a Priority > > KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) > ,which is no difference with its Parent class (SourceReaderBase). why not > remove this override method? > > Relative code is here, which we can see is no difference? > {code:java} > //org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits > @Override > public void pauseOrResumeSplits( > Collection splitsToPause, Collection splitsToResume) { > splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); > } > //org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits > @Override > public void pauseOrResumeSplits( > Collection splitsToPause, Collection splitsToResume) { > splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31208) KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits)
Hongshun Wang created FLINK-31208: - Summary: KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) Key: FLINK-31208 URL: https://issues.apache.org/jira/browse/FLINK-31208 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Hongshun Wang KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) ,because no difference with its Parent class (SourceReaderBase). why not remove this override method? Relative code is here, which we can see is no difference? {code:java} //org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits @Override public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); } //org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits @Override public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume); }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116556431 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1085,27 +1081,27 @@ public void testNotifyCheckpointOnClosedOperator() throws Throwable { harness.streamTask.notifyCheckpointCompleteAsync(1); harness.streamTask.runMailboxStep(); -assertEquals(1, ClosingOperator.notified.get()); -assertFalse(ClosingOperator.closed.get()); +assertThat(ClosingOperator.notified.get()).isOne(); Review Comment: Thank you~. I updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116556097 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1121,13 +1117,14 @@ private void testFailToConfirmCheckpointMessage(Consumer> consu StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain(streamMap).build(); -try { -consumer.accept(harness.streamTask); -harness.streamTask.runMailboxLoop(); -fail(); -} catch (ExpectedTestException expected) { -// expected exceptionestProcessWithUnAvailableInput -} +// expected exceptionestProcessWithUnAvailableInput +assertThatThrownBy( +() -> { +consumer.accept(harness.streamTask); +harness.streamTask.runMailboxLoop(); +fail(null); Review Comment: It's from the old line. And I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116555777 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> { +testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); +throw new RuntimeException( Review Comment: It's the old logic line from migration. but redundant. This lines rollback to the original lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RocMarshal commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
RocMarshal commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116555051 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> { +testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); +throw new RuntimeException( +"Expected an exception but ran successfully"); +}) +.isInstanceOf(ExpectedTestException.class); -try { -testHarness.finishProcessing(); -} catch (Exception ex) { -// todo: checking for suppression if there are more exceptions during cleanup -ExceptionUtils.assertThrowable(ex, FailingTwiceOperator.CloseException.class); -} +// todo: checking for suppression if there are more exceptions during cleanup +assertThatThrownBy(testHarness::finishProcessing) +.isInstanceOf(FailingTwiceOperator.CloseException.class); Review Comment: nice catch~I compared `assertThrowable`logic with `isInstanceOf `, It's indeed as your description. I try to find the logic-equal API to replaced it but no result. So, I rollback to the `assertThrowable` with original implementation. Of course, I'd like to accept any suggestions helpful . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22484) Built-in functions for collections
[ https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693015#comment-17693015 ] jackylau commented on FLINK-22484: -- hi [~Sergey Nuyanzin] how aboult add map_entries/map_from_entries/map_from_arrays/map_contains_key functions, what do you think? the high order function like transform, which need calcite supports first > Built-in functions for collections > -- > > Key: FLINK-22484 > URL: https://issues.apache.org/jira/browse/FLINK-22484 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > There is a number of built-in functions to work with collections are > supported by other vendors. After looking at Postgresql, BigQuery, Spark > there was selected a list of more or less generic functions for collections > (for more details see [1]). > Feedback for the doc is welcome > [1] > [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing] > MAP_KEYS > MAP_VALUES > MAP_FROM_ARRAYS -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
reswqa commented on code in PR #21999: URL: https://github.com/apache/flink/pull/21999#discussion_r1116492529 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> { +testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); +throw new RuntimeException( Review Comment: Why throw exception here? ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1085,27 +1081,27 @@ public void testNotifyCheckpointOnClosedOperator() throws Throwable { harness.streamTask.notifyCheckpointCompleteAsync(1); harness.streamTask.runMailboxStep(); -assertEquals(1, ClosingOperator.notified.get()); -assertFalse(ClosingOperator.closed.get()); +assertThat(ClosingOperator.notified.get()).isOne(); Review Comment: IIRC, AssertJ does have specific assertions for `AtomicXXX`, please avoid using `get`. It is also necessary to check the similar problems of the whole test class. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -1121,13 +1117,14 @@ private void testFailToConfirmCheckpointMessage(Consumer> consu StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain(streamMap).build(); -try { -consumer.accept(harness.streamTask); -harness.streamTask.runMailboxLoop(); -fail(); -} catch (ExpectedTestException expected) { -// expected exceptionestProcessWithUnAvailableInput -} +// expected exceptionestProcessWithUnAvailableInput +assertThatThrownBy( +() -> { +consumer.accept(harness.streamTask); +harness.streamTask.runMailboxLoop(); +fail(null); Review Comment: Why we need `fail`? ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java: ## @@ -362,30 +357,28 @@ private void testSyncSavepointWithEndInput( "savepointResult"); harness.processAll(); -Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + assertThat(TestBoundedOneInputStreamOperator.isInputEnded()).isEqualTo(expectEndInput); } @Test -public void testCleanUpExceptionSuppressing() throws Exception { +void testCleanUpExceptionSuppressing() throws Exception { try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) .addInput(STRING_TYPE_INFO) .setupOutputForSingletonOperatorChain(new FailingTwiceOperator()) .build()) { -try { -testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); -throw new RuntimeException("Expected an exception but ran successfully"); -} catch (Exception ex) { -ExceptionUtils.assertThrowable(ex, ExpectedTestException.class); -} +assertThatThrownBy( +() -> { +testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); +throw new RuntimeException( +"Expected an exception but ran successfully"); +}) +.isInstanceOf(ExpectedTestException
[jira] [Updated] (FLINK-31207) Supports high order function like other engine
[ https://issues.apache.org/jira/browse/FLINK-31207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jackylau updated FLINK-31207: - Description: spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform] transform/transform_keys/transform_values after calcite https://issues.apache.org/jira/browse/CALCITE-3679s upports high order functions, we should supports many high order funcsions like spark/presto was: spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform] after calcite https://issues.apache.org/jira/browse/CALCITE-3679s upports high order functions, we should supports many high order funcsions like spark/presto > Supports high order function like other engine > -- > > Key: FLINK-31207 > URL: https://issues.apache.org/jira/browse/FLINK-31207 > Project: Flink > Issue Type: Sub-task >Reporter: jackylau >Priority: Major > > spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform] > transform/transform_keys/transform_values > after calcite https://issues.apache.org/jira/browse/CALCITE-3679s upports > high order functions, we should supports many high order funcsions like > spark/presto -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] zhuangchong opened a new pull request, #556: [hotfix] Add version reference in the pom
zhuangchong opened a new pull request, #556: URL: https://github.com/apache/flink-table-store/pull/556 Add version reference of `kafka` dependent package -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22091) env.java.home option didn't take effect in resource negotiator
[ https://issues.apache.org/jira/browse/FLINK-22091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693008#comment-17693008 ] Samrat Deb commented on FLINK-22091: i would like to work on this issue. > env.java.home option didn't take effect in resource negotiator > -- > > Key: FLINK-22091 > URL: https://issues.apache.org/jira/browse/FLINK-22091 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.11.1, 1.12.2 >Reporter: zlzhang0122 >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > If we have set the value of env.java.home in flink-conf.yaml, it will take > effect in standalone mode, but it won't take effect in resource negotiator > such as yarn, kubernetes, etc.. Maybe we can do some change and make it take > effect? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31207) Supports high order function like other engine
[ https://issues.apache.org/jira/browse/FLINK-31207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jackylau updated FLINK-31207: - Summary: Supports high order function like other engine (was: supports high order function like other engine) > Supports high order function like other engine > -- > > Key: FLINK-31207 > URL: https://issues.apache.org/jira/browse/FLINK-31207 > Project: Flink > Issue Type: Sub-task >Reporter: jackylau >Priority: Major > > spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform] > after calcite https://issues.apache.org/jira/browse/CALCITE-3679s upports > high order functions, we should supports many high order funcsions like > spark/presto -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31207) supports high order function like other engine
jackylau created FLINK-31207: Summary: supports high order function like other engine Key: FLINK-31207 URL: https://issues.apache.org/jira/browse/FLINK-31207 Project: Flink Issue Type: Sub-task Reporter: jackylau spark [https://spark.apache.org/docs/latest/api/sql/index.html#transform] after calcite https://issues.apache.org/jira/browse/CALCITE-3679s upports high order functions, we should supports many high order funcsions like spark/presto -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on pull request #21999: [FLINK-29816][streaming] Fix the bug that StreamTask doesn't handle exception during restoring
1996fanrui commented on PR #21999: URL: https://github.com/apache/flink/pull/21999#issuecomment-1442822176 Please squish these commits, and follow the rules that @reswqa mentioned before. > This PR will eventually have two commits in the following order: > 1.[hotfix] Migrate StreamTaskTest to Junit5 and Assertj > 2.[[FLINK-29816](https://issues.apache.org/jira/browse/FLINK-29816)][streaming] Fix the bug that StreamTask And note the commit order, hotfix commit should before your commit, otherwises, you need to write new test with junit4, and then upgrade it in the next commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31196) Flink on YARN honors env.java.home
[ https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prabhu Joseph updated FLINK-31196: -- Description: Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger containers with a configured env.java.home. One option to set the JAVA_HOME for Flink JobManager, TaskManager running on YARN {code:java} containerized.master.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk containerized.taskmanager.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk {code} was:Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger containers with a configured env.java.home. > Flink on YARN honors env.java.home > -- > > Key: FLINK-31196 > URL: https://issues.apache.org/jira/browse/FLINK-31196 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.16.1 >Reporter: Prabhu Joseph >Priority: Major > > Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger > containers with a configured env.java.home. > One option to set the JAVA_HOME for Flink JobManager, TaskManager running on > YARN > {code:java} > containerized.master.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk > containerized.taskmanager.env.JAVA_HOME: /usr/lib/jvm/java-11-openjdk {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31196) Flink on YARN honors env.java.home
[ https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prabhu Joseph resolved FLINK-31196. --- Resolution: Duplicate > Flink on YARN honors env.java.home > -- > > Key: FLINK-31196 > URL: https://issues.apache.org/jira/browse/FLINK-31196 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.16.1 >Reporter: Prabhu Joseph >Priority: Major > > Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger > containers with a configured env.java.home. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31196) Flink on YARN honors env.java.home
[ https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693003#comment-17693003 ] Prabhu Joseph commented on FLINK-31196: --- Thanks [~zlzhang0122] for the update. Have missed to search properly in Flink Jira List before raising the ticket. We will close this as a duplicate and provide a patch for the original one, Flink-22091 > Flink on YARN honors env.java.home > -- > > Key: FLINK-31196 > URL: https://issues.apache.org/jira/browse/FLINK-31196 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.16.1 >Reporter: Prabhu Joseph >Priority: Major > > Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger > containers with a configured env.java.home. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-20395) Migrate test_netty_shuffle_memory_control.sh
[ https://issues.apache.org/jira/browse/FLINK-20395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-20395: -- Assignee: Weijie Guo > Migrate test_netty_shuffle_memory_control.sh > > > Key: FLINK-20395 > URL: https://issues.apache.org/jira/browse/FLINK-20395 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network, Test Infrastructure >Reporter: Matthias Pohl >Assignee: Weijie Guo >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31206) Broken links on flink.apache.org
Zili Chen created FLINK-31206: - Summary: Broken links on flink.apache.org Key: FLINK-31206 URL: https://issues.apache.org/jira/browse/FLINK-31206 Project: Flink Issue Type: Bug Reporter: Zili Chen Previously page link https://flink.apache.org/contribute/code-style-and-quality/preamble/ is broken, new link is https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/. Shall we set up a redirection or just let those broken links wait for maintainers fixing? cc [~martijnvisser] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31204) HiveCatalogITCase fails due to avro conflict in table store
[ https://issues.apache.org/jira/browse/FLINK-31204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31204. Fix Version/s: table-store-0.4.0 Assignee: Shammon Resolution: Fixed master: 306a9ededbebc1d825cbb02c18338f5accf7faca > HiveCatalogITCase fails due to avro conflict in table store > --- > > Key: FLINK-31204 > URL: https://issues.apache.org/jira/browse/FLINK-31204 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Shammon >Assignee: Shammon >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Test fails in IDEA > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > ... 4 more > Caused by: java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:203) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:172) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:55) > at > org.apache.flink.table.store.format.avro.AvroFileFormat$AvroGenericRecordBulkFormat.(AvroFileFormat.java:95) > at > org.apache.flink.table.store.format.avro.AvroFileFormat.createReaderFactory(AvroFileFormat.java:80) > at > org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:71) > at > org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:67) > at > org.apache.flink.table.store.file.manifest.ManifestList$Factory.create(ManifestList.java:130) > at > org.apache.flink.table.store.file.operation.AbstractFileStoreScan.(AbstractFileStoreScan.java:95) > at > org.apache.flink.table.store.file.operation.KeyValueFileStoreScan.(KeyValueFileStoreScan.java:57) > at > org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:118) > at > org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:71) > at > org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:38) > at > org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:116) > at > org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:43) > at > org.apache.flink.table.store.table.AbstractFileStoreTable.newCommit(AbstractFileStoreTable.java:121) > at > org.apache.flink.table.store.connector.sink.FileStoreSink.lambda$createCommitterFactory$63124b4e$1(FileStoreSink.java:69) > at > org.apache.flink.table.store.connector.sink.CommitterOperator.initializeState(CommitterOperator.java:104) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:750) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on pull request #20006: [FLINK-27415][Connectors / FileSystem] Read empty csv file throws exception in FileSystem table connector
wanglijie95 commented on PR #20006: URL: https://github.com/apache/flink/pull/20006#issuecomment-1442775810 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi merged pull request #555: [FLINK-31204] Fix HiveCatalogITCase fails in IDEA
JingsongLi merged PR #555: URL: https://github.com/apache/flink-table-store/pull/555 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
wanglijie95 commented on PR #21963: URL: https://github.com/apache/flink/pull/21963#issuecomment-1442773339 Thanks for review @JunRuiLee @zhuzhurk . I 've addressed all comments, please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
wanglijie95 commented on code in PR #21963: URL: https://github.com/apache/flink/pull/21963#discussion_r1116480439 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -1060,14 +,97 @@ private void setVertexConfig( vertexConfigs.put(vertexID, config); } +private void setChainedOutputsConfig( +Integer vertexId, StreamConfig config, List chainableOutputs) { +// iterate edges, find sideOutput edges create and save serializers for each outputTag type +for (StreamEdge edge : chainableOutputs) { +if (edge.getOutputTag() != null) { +config.setTypeSerializerSideOut( +edge.getOutputTag(), +edge.getOutputTag() +.getTypeInfo() + .createSerializer(streamGraph.getExecutionConfig())); +} +} +config.setChainedOutputs(chainableOutputs); +} + +private void setOperatorNonChainedOutputsConfig( +Integer vertexId, +StreamConfig config, +List nonChainableOutputs, +Map outputsConsumedByEdge) { +// iterate edges, find sideOutput edges create and save serializers for each outputTag type +for (StreamEdge edge : nonChainableOutputs) { +if (edge.getOutputTag() != null) { +config.setTypeSerializerSideOut( +edge.getOutputTag(), +edge.getOutputTag() +.getTypeInfo() + .createSerializer(streamGraph.getExecutionConfig())); +} +} + +List deduplicatedOutputs = +mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, outputsConsumedByEdge); +config.setNumberOfOutputs(deduplicatedOutputs.size()); +config.setOperatorNonChainedOutputs(deduplicatedOutputs); +} + +private void setVertexNonChainedOutputsConfig( +Integer startNodeId, +StreamConfig config, +List transitiveOutEdges, +final Map> opIntermediateOutputs) { + +LinkedHashSet transitiveOutputs = new LinkedHashSet<>(); +for (StreamEdge edge : transitiveOutEdges) { +NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge); +transitiveOutputs.add(output); +connect(startNodeId, edge, output); +} + +config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs)); +} + +private void setAllOperatorNonChainedOutputsConfigs( +final Map> opIntermediateOutputs) { +// set non chainable output config +opNonChainableOutputsCache.forEach( +(vertexId, nonChainableOutputs) -> { +Map outputsConsumedByEdge = +opIntermediateOutputs.computeIfAbsent( +vertexId, ignored -> new HashMap<>()); +setOperatorNonChainedOutputsConfig( +vertexId, +vertexConfigs.get(vertexId), +nonChainableOutputs, +outputsConsumedByEdge); +}); +} + +private void setAllVertexNonChainedOutputsConfigs( +final Map> opIntermediateOutputs) { +jobVertices +.keySet() +.forEach( +startNodeId -> { +setVertexNonChainedOutputsConfig( +startNodeId, +vertexConfigs.get(startNodeId), + chainInfos.get(startNodeId).getTransitiveOutEdges(), +opIntermediateOutputs); +}); Review Comment: I'm wrong. It's can be removed :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31107) Can't subscribe the non-persistent topics by using regex in Pulsar 2.11.0
[ https://issues.apache.org/jira/browse/FLINK-31107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yufan Sheng updated FLINK-31107: Description: This is a ticket for tracking [the known issue|https://github.com/apache/pulsar/issues/19316] in Pulsar. Pulsar changes its internal logic for topic pattern subscribe and didn't return any {{non-persistent}} topics. We will close this issue after Pulsar fixes this bug in the future. Another issue for tracking this BUG: https://github.com/apache/pulsar/issues/19493 was:This is a ticket for tracking [the known issue|https://github.com/apache/pulsar/issues/19316] in Pulsar. Pulsar changes its internal logic for topic pattern subscribe and didn't return any {{non-persistent}} topics. We will close this issue after Pulsar fixes this bug in the future. > Can't subscribe the non-persistent topics by using regex in Pulsar 2.11.0 > - > > Key: FLINK-31107 > URL: https://issues.apache.org/jira/browse/FLINK-31107 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: pulsar-4.0.0 >Reporter: Yufan Sheng >Priority: Minor > > This is a ticket for tracking [the known > issue|https://github.com/apache/pulsar/issues/19316] in Pulsar. Pulsar > changes its internal logic for topic pattern subscribe and didn't return any > {{non-persistent}} topics. We will close this issue after Pulsar fixes this > bug in the future. > Another issue for tracking this BUG: > https://github.com/apache/pulsar/issues/19493 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
wanglijie95 commented on code in PR #21963: URL: https://github.com/apache/flink/pull/21963#discussion_r1116477683 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -568,12 +592,7 @@ && isChainableInput(sourceOutEdge, streamGraph)) { final StreamConfig.SourceInputConfig inputConfig = new StreamConfig.SourceInputConfig(sourceOutEdge); final StreamConfig operatorConfig = new StreamConfig(new Configuration()); -setVertexConfig( -sourceNodeId, -operatorConfig, -Collections.emptyList(), -Collections.emptyList(), -Collections.emptyMap()); +setOperatorConfig(sourceNodeId, operatorConfig, Collections.emptyMap()); Review Comment: Yes. I will modify it to the same logic as before -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
wanglijie95 commented on code in PR #21963: URL: https://github.com/apache/flink/pull/21963#discussion_r1116477303 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroup.java: ## @@ -53,9 +55,22 @@ public ForwardGroup(final Set jobVertices) { .map(JobVertex::getParallelism) .collect(Collectors.toSet()); -checkState(decidedParallelisms.size() <= 1); -if (decidedParallelisms.size() == 1) { -this.parallelism = decidedParallelisms.iterator().next(); +checkState(configuredParallelisms.size() <= 1); +if (configuredParallelisms.size() == 1) { +this.parallelism = configuredParallelisms.iterator().next(); +} + +Set configuredMaxParallelisms = +jobVertices.stream() +.map(JobVertex::getMaxParallelism) +.filter(val -> val > 0) +.collect(Collectors.toSet()); + +if (!configuredMaxParallelisms.isEmpty()) { +this.maxParallelism = Collections.min(configuredMaxParallelisms); +checkState( +maxParallelism >= parallelism, Review Comment: I will change the check to `checkState(parallelism == -1 || maxParallelism >= parallelism)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] venkata91 commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
venkata91 commented on PR #22009: URL: https://github.com/apache/flink/pull/22009#issuecomment-1442766757 Looking into the test failure, somehow it worked locally but failing in the pipeline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
wanglijie95 commented on code in PR #21963: URL: https://github.com/apache/flink/pull/21963#discussion_r1116476622 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -928,14 +1011,8 @@ private StreamConfig createJobVertex(Integer streamNodeId, OperatorChainInfo cha return new StreamConfig(jobVertex.getConfiguration()); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
wanglijie95 commented on code in PR #21963: URL: https://github.com/apache/flink/pull/21963#discussion_r1116476465 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -1060,14 +,97 @@ private void setVertexConfig( vertexConfigs.put(vertexID, config); } +private void setChainedOutputsConfig( +Integer vertexId, StreamConfig config, List chainableOutputs) { +// iterate edges, find sideOutput edges create and save serializers for each outputTag type +for (StreamEdge edge : chainableOutputs) { +if (edge.getOutputTag() != null) { +config.setTypeSerializerSideOut( +edge.getOutputTag(), +edge.getOutputTag() +.getTypeInfo() + .createSerializer(streamGraph.getExecutionConfig())); +} +} +config.setChainedOutputs(chainableOutputs); +} + +private void setOperatorNonChainedOutputsConfig( +Integer vertexId, +StreamConfig config, +List nonChainableOutputs, +Map outputsConsumedByEdge) { +// iterate edges, find sideOutput edges create and save serializers for each outputTag type +for (StreamEdge edge : nonChainableOutputs) { +if (edge.getOutputTag() != null) { +config.setTypeSerializerSideOut( +edge.getOutputTag(), +edge.getOutputTag() +.getTypeInfo() + .createSerializer(streamGraph.getExecutionConfig())); +} +} + +List deduplicatedOutputs = +mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, outputsConsumedByEdge); +config.setNumberOfOutputs(deduplicatedOutputs.size()); +config.setOperatorNonChainedOutputs(deduplicatedOutputs); +} + +private void setVertexNonChainedOutputsConfig( +Integer startNodeId, +StreamConfig config, +List transitiveOutEdges, +final Map> opIntermediateOutputs) { + +LinkedHashSet transitiveOutputs = new LinkedHashSet<>(); +for (StreamEdge edge : transitiveOutEdges) { +NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge); +transitiveOutputs.add(output); +connect(startNodeId, edge, output); +} + +config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs)); +} + +private void setAllOperatorNonChainedOutputsConfigs( +final Map> opIntermediateOutputs) { +// set non chainable output config +opNonChainableOutputsCache.forEach( +(vertexId, nonChainableOutputs) -> { +Map outputsConsumedByEdge = +opIntermediateOutputs.computeIfAbsent( +vertexId, ignored -> new HashMap<>()); +setOperatorNonChainedOutputsConfig( +vertexId, +vertexConfigs.get(vertexId), +nonChainableOutputs, +outputsConsumedByEdge); +}); +} + +private void setAllVertexNonChainedOutputsConfigs( +final Map> opIntermediateOutputs) { +jobVertices +.keySet() +.forEach( +startNodeId -> { +setVertexNonChainedOutputsConfig( +startNodeId, +vertexConfigs.get(startNodeId), + chainInfos.get(startNodeId).getTransitiveOutEdges(), +opIntermediateOutputs); +}); Review Comment: The {} is needed because the `setVertexNonChainedOutputsConfig` has no return value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader
wuchong commented on PR #21966: URL: https://github.com/apache/flink/pull/21966#issuecomment-1442759082 @stayrascal thank you for helping to verifying it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader
lsyldliu commented on PR #21966: URL: https://github.com/apache/flink/pull/21966#issuecomment-1442750732 @stayrascal Very thanks you can make double-check. I think it works well because it works on JM side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31117) Split flink connector to each module of each version
[ https://issues.apache.org/jira/browse/FLINK-31117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31117. Resolution: Fixed master: 37f75a85091697b75c6968293edfb060595adae3 > Split flink connector to each module of each version > > > Key: FLINK-31117 > URL: https://issues.apache.org/jira/browse/FLINK-31117 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > This will make compilation and testing much easier. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #545: [FLINK-31117] Split flink connector to each module of each version
JingsongLi merged PR #545: URL: https://github.com/apache/flink-table-store/pull/545 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stayrascal commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader
stayrascal commented on PR #21966: URL: https://github.com/apache/flink/pull/21966#issuecomment-1442721215 @lsyldliu thanks a lot for looking at this issue, may I check that if we use flink filesytem instead of hadoop filesytem to open parquet file, so that flink fs can retrieve the configured credential info(or fs properties) from fink-conf.yaml automatically? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] FangYongs commented on a diff in pull request #555: [FLINK-31204] Fix HiveCatalogITCase fails in IDEA
FangYongs commented on code in PR #555: URL: https://github.com/apache/flink-table-store/pull/555#discussion_r1116445519 ## flink-table-store-hive/flink-table-store-hive-catalog/pom.xml: ## @@ -535,6 +535,65 @@ under the License. org.jamon jamon-runtime + +org.apache.hive +hive-exec + + + + + + +org.apache.avro +avro +${avro.version} +test + + + +org.apache.hive +hive-exec +${hive.version} +provided Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22922) Migrate flink project website to Hugo
[ https://issues.apache.org/jira/browse/FLINK-22922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692982#comment-17692982 ] Zili Chen commented on FLINK-22922: --- Hi [~martijnvisser]! I see now we use `asf-site` for both dev and deploy. How can we ensure the website is always rebuilt, or now we do it manually? > Migrate flink project website to Hugo > - > > Key: FLINK-22922 > URL: https://issues.apache.org/jira/browse/FLINK-22922 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Chesnay Schepler >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > > Hugo is working like a charm for the Flink documentation. To reduce the > number of software stacks, and massively reduce friction when building the > current Flink website, we should migrate the Flink website to hugo as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Mulavar commented on pull request #21545: [FLINK-30396][table]make alias hint take effect in correlate
Mulavar commented on PR #21545: URL: https://github.com/apache/flink/pull/21545#issuecomment-1442717916 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31204) HiveCatalogITCase fails due to avro conflict in table store
[ https://issues.apache.org/jira/browse/FLINK-31204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31204: --- Labels: pull-request-available (was: ) > HiveCatalogITCase fails due to avro conflict in table store > --- > > Key: FLINK-31204 > URL: https://issues.apache.org/jira/browse/FLINK-31204 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Shammon >Priority: Major > Labels: pull-request-available > > Test fails in IDEA > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > ... 4 more > Caused by: java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:203) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:172) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147) > at > org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:55) > at > org.apache.flink.table.store.format.avro.AvroFileFormat$AvroGenericRecordBulkFormat.(AvroFileFormat.java:95) > at > org.apache.flink.table.store.format.avro.AvroFileFormat.createReaderFactory(AvroFileFormat.java:80) > at > org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:71) > at > org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:67) > at > org.apache.flink.table.store.file.manifest.ManifestList$Factory.create(ManifestList.java:130) > at > org.apache.flink.table.store.file.operation.AbstractFileStoreScan.(AbstractFileStoreScan.java:95) > at > org.apache.flink.table.store.file.operation.KeyValueFileStoreScan.(KeyValueFileStoreScan.java:57) > at > org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:118) > at > org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:71) > at > org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:38) > at > org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:116) > at > org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:43) > at > org.apache.flink.table.store.table.AbstractFileStoreTable.newCommit(AbstractFileStoreTable.java:121) > at > org.apache.flink.table.store.connector.sink.FileStoreSink.lambda$createCommitterFactory$63124b4e$1(FileStoreSink.java:69) > at > org.apache.flink.table.store.connector.sink.CommitterOperator.initializeState(CommitterOperator.java:104) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:750) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #555: [FLINK-31204] Fix HiveCatalogITCase fails in IDEA
JingsongLi commented on code in PR #555: URL: https://github.com/apache/flink-table-store/pull/555#discussion_r111644 ## flink-table-store-hive/flink-table-store-hive-catalog/pom.xml: ## @@ -535,6 +535,65 @@ under the License. org.jamon jamon-runtime + +org.apache.hive +hive-exec + + + + + + +org.apache.avro +avro +${avro.version} +test + + + +org.apache.hive +hive-exec +${hive.version} +provided Review Comment: test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692979#comment-17692979 ] Ran Tao commented on FLINK-12450: - [~jark] Hi, Jark. can you assign this ticket to me? i'm glad to support it. > [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table > API and SQL > --- > > Key: FLINK-12450 > URL: https://issues.apache.org/jira/browse/FLINK-12450 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhanchun Zhang >Priority: Major > Labels: auto-unassigned > > BIT_LSHIFT, Shifts a long number to the left > BIT_RSHIFT, Shifts a long number to the right -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-12449) [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692978#comment-17692978 ] Ran Tao commented on FLINK-12449: - [~lzljs3620320] hi,Jingsong, can you assign this ticket to me? i'm glad to support it. > [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and > SQL > - > > Key: FLINK-12449 > URL: https://issues.apache.org/jira/browse/FLINK-12449 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhanchun Zhang >Priority: Major > Labels: auto-unassigned > > Bitwise AND. > eg. SELECT BIT_AND(29,15), returns 13 > Bitwise OR > eg. SELECT BIT_OR(29 ,15), returns 31 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #553: Unify the version of `kafka` in pom.
JingsongLi merged PR #553: URL: https://github.com/apache/flink-table-store/pull/553 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
flinkbot commented on PR #22009: URL: https://github.com/apache/flink/pull/22009#issuecomment-1442699380 ## CI report: * c7e58124ab18ef98248a279359ee9014071847dc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #21966: [FLINK-29729][parquet] Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader
lsyldliu commented on PR #21966: URL: https://github.com/apache/flink/pull/21966#issuecomment-1442697375 @wuchong Thanks for reviewing. Regarding your question, we can't verify the configuration in ParquetReader now because the `ParquetFileReader` doesn't contain the credential info. The credential info is only used when creating hadoop FileSystem, I have checked the related code to create FileSystem, and it loads hadoop conf from flink-conf.yaml first, then create the corresponding FileSystem, refer to https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java#L126 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31109) Fails with proxy user not supported even when security.kerberos.fetch.delegation-token is set to false
[ https://issues.apache.org/jira/browse/FLINK-31109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692973#comment-17692973 ] Venkata krishnan Sowrirajan commented on FLINK-31109: - [~martijnvisser] Sure, I have posted a PR with the fix after discussing with [~gaborgsomogyi] . Please take a look whenever you get a chance. Thanks. > Fails with proxy user not supported even when > security.kerberos.fetch.delegation-token is set to false > -- > > Key: FLINK-31109 > URL: https://issues.apache.org/jira/browse/FLINK-31109 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Venkata krishnan Sowrirajan >Assignee: Venkata krishnan Sowrirajan >Priority: Blocker > Labels: pull-request-available > > With > {code:java} > security.kerberos.fetch.delegation-token: false > {code} > and delegation tokens obtained through our internal service which sets both > HADOOP_TOKEN_FILE_LOCATION to pick up the DTs and also sets the > HADOOP_PROXY_USER which fails with the below error > {code:java} > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/export/home/vsowrira/flink-1.18-SNAPSHOT/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/export/apps/hadoop/hadoop-bin_2100503/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] > org.apache.flink.runtime.security.modules.SecurityModule$SecurityInstallException: > Unable to set the Hadoop login user > at > org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:106) > at > org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:76) > at > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:57) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1188) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) > Caused by: java.lang.UnsupportedOperationException: Proxy user is not > supported > at > org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.throwProxyUserNotSupported(KerberosLoginProvider.java:137) > at > org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.isLoginPossible(KerberosLoginProvider.java:81) > at > org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:73) > ... 4 more > {code} > This seems to have gotten changed after > [480e6edf|https://github.com/apache/flink/commit/480e6edf9732f8334ef7576080fdbfc98051cb28] > ([FLINK-28330][runtime][security] Remove old delegation token framework code) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] venkata91 commented on pull request #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
venkata91 commented on PR #22009: URL: https://github.com/apache/flink/pull/22009#issuecomment-1442696039 Please review. cc @MartijnVisser @gaborgsomogyi @becketqin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31109) Fails with proxy user not supported even when security.kerberos.fetch.delegation-token is set to false
[ https://issues.apache.org/jira/browse/FLINK-31109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31109: --- Labels: pull-request-available (was: ) > Fails with proxy user not supported even when > security.kerberos.fetch.delegation-token is set to false > -- > > Key: FLINK-31109 > URL: https://issues.apache.org/jira/browse/FLINK-31109 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Venkata krishnan Sowrirajan >Assignee: Venkata krishnan Sowrirajan >Priority: Blocker > Labels: pull-request-available > > With > {code:java} > security.kerberos.fetch.delegation-token: false > {code} > and delegation tokens obtained through our internal service which sets both > HADOOP_TOKEN_FILE_LOCATION to pick up the DTs and also sets the > HADOOP_PROXY_USER which fails with the below error > {code:java} > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/export/home/vsowrira/flink-1.18-SNAPSHOT/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/export/apps/hadoop/hadoop-bin_2100503/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] > org.apache.flink.runtime.security.modules.SecurityModule$SecurityInstallException: > Unable to set the Hadoop login user > at > org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:106) > at > org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:76) > at > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:57) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1188) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) > Caused by: java.lang.UnsupportedOperationException: Proxy user is not > supported > at > org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.throwProxyUserNotSupported(KerberosLoginProvider.java:137) > at > org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider.isLoginPossible(KerberosLoginProvider.java:81) > at > org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:73) > ... 4 more > {code} > This seems to have gotten changed after > [480e6edf|https://github.com/apache/flink/commit/480e6edf9732f8334ef7576080fdbfc98051cb28] > ([FLINK-28330][runtime][security] Remove old delegation token framework code) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] venkata91 opened a new pull request, #22009: [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f…
venkata91 opened a new pull request, #22009: URL: https://github.com/apache/flink/pull/22009 …etch is disabled ## What is the purpose of the change FLINK-28330 removed old delegation token framework code as part of it removed the existing support for delegation tokens that are managed outside of Flink ## Brief change log - As part of `YarnClusterDescriptor#setTokensFor` load the available delegation tokens in the client machine and then load the tokens obtained through `DelegationTokenManager` if `security.delegation.tokens.enabled` is set to true. - `HadoopModule` should throw exception if the `UserGroupInformation.currentUser` is a hadoop proxy user and also `security.delegation.tokens.enabled` is set to true. ## Verifying this change - Added tests in the `HadoopModuleTest` and in `KerberosLoginProviderITCase`. - Manually tested in our env where delegation token fetch is managed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31205) do optimize for multi sink in a single relNode tree
Aitozi created FLINK-31205: -- Summary: do optimize for multi sink in a single relNode tree Key: FLINK-31205 URL: https://issues.apache.org/jira/browse/FLINK-31205 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi Flink supports multi sink usage, but it optimize the each sink in a individual RelNode tree, this will miss some opportunity to do some cross tree optimization, eg: {code:java} create table newX( a int, b bigint, c varchar, d varchar, e varchar ) with ( 'connector' = 'values' ,'enable-projection-push-down' = 'true' insert into sink_table select a, b from newX insert into sink_table select a, 1 from newX {code} It will produce the plan as below, this will cause the source be consumed twice {code:java} Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b]) Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Calc(select=[a, 1 AS b]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a], metadata=[]]], fields=[a]) {code} In this ticket, I propose to do a global optimization for the multi sink by * Megre the multi sink(with same table) into a single relNode tree with an extra union node * After optimization, split the merged union back to the original multi sink In my poc, after step 1, it will produce the plan as below, I think it will do good for the global performacne {code:java} Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Union(all=[true], union=[a, b]) :- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1]) +- Reused(reference_id=[1]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zlzhang0122 commented on pull request #20183: [FLINK-24306][table]Group by index throw SqlValidatorException
zlzhang0122 commented on PR #20183: URL: https://github.com/apache/flink/pull/20183#issuecomment-1442682491 @wuchong Could you pls help to confirm and review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31201) Provides option to sort partition for full stage in streaming read
[ https://issues.apache.org/jira/browse/FLINK-31201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-31201: Assignee: Jingsong Lee > Provides option to sort partition for full stage in streaming read > -- > > Key: FLINK-31201 > URL: https://issues.apache.org/jira/browse/FLINK-31201 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Blocker > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > The overall order may be out of order due to the writing of the old > partition. We can provide an option to sort the full reading stage by > partition fields to avoid the disorder. > (Actually, Currently, it is out of order for partitions. Because HashMap is > used, we may be able to sort according to the creation time of the first > file?) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31196) Flink on YARN honors env.java.home
[ https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692966#comment-17692966 ] zlzhang0122 commented on FLINK-31196: - Maybe duplicate with this? [Flink-22091|https://issues.apache.org/jira/browse/FLINK-22091] > Flink on YARN honors env.java.home > -- > > Key: FLINK-31196 > URL: https://issues.apache.org/jira/browse/FLINK-31196 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.16.1 >Reporter: Prabhu Joseph >Priority: Major > > Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger > containers with a configured env.java.home. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31201) Provides option to sort partition for full stage in streaming read
[ https://issues.apache.org/jira/browse/FLINK-31201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31201: --- Labels: pull-request-available (was: ) > Provides option to sort partition for full stage in streaming read > -- > > Key: FLINK-31201 > URL: https://issues.apache.org/jira/browse/FLINK-31201 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Priority: Blocker > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > The overall order may be out of order due to the writing of the old > partition. We can provide an option to sort the full reading stage by > partition fields to avoid the disorder. > (Actually, Currently, it is out of order for partitions. Because HashMap is > used, we may be able to sort according to the creation time of the first > file?) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi opened a new pull request, #554: [FLINK-31201] Scan to generate splits should care about file order
JingsongLi opened a new pull request, #554: URL: https://github.com/apache/flink-table-store/pull/554 Currently, it is out of order for partitions. Because HashMap is used, we may be able to use LinkedHashMap to make the order according to the creation time of the first file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31204) HiveCatalogITCase fails due to avro conflict in table store
Shammon created FLINK-31204: --- Summary: HiveCatalogITCase fails due to avro conflict in table store Key: FLINK-31204 URL: https://issues.apache.org/jira/browse/FLINK-31204 Project: Flink Issue Type: Bug Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Shammon Test fails in IDEA at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z at org.apache.flink.table.store.format.avro.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:203) at org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:172) at org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147) at org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:147) at org.apache.flink.table.store.format.avro.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:55) at org.apache.flink.table.store.format.avro.AvroFileFormat$AvroGenericRecordBulkFormat.(AvroFileFormat.java:95) at org.apache.flink.table.store.format.avro.AvroFileFormat.createReaderFactory(AvroFileFormat.java:80) at org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:71) at org.apache.flink.table.store.format.FileFormat.createReaderFactory(FileFormat.java:67) at org.apache.flink.table.store.file.manifest.ManifestList$Factory.create(ManifestList.java:130) at org.apache.flink.table.store.file.operation.AbstractFileStoreScan.(AbstractFileStoreScan.java:95) at org.apache.flink.table.store.file.operation.KeyValueFileStoreScan.(KeyValueFileStoreScan.java:57) at org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:118) at org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:71) at org.apache.flink.table.store.file.KeyValueFileStore.newScan(KeyValueFileStore.java:38) at org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:116) at org.apache.flink.table.store.file.AbstractFileStore.newCommit(AbstractFileStore.java:43) at org.apache.flink.table.store.table.AbstractFileStoreTable.newCommit(AbstractFileStoreTable.java:121) at org.apache.flink.table.store.connector.sink.FileStoreSink.lambda$createCommitterFactory$63124b4e$1(FileStoreSink.java:69) at org.apache.flink.table.store.connector.sink.CommitterOperator.initializeState(CommitterOperator.java:104) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan commented on pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on PR #21981: URL: https://github.com/apache/flink/pull/21981#issuecomment-1442587044 Thanks for the feedback, @zentol and @dmvk . I've updated the PR, would you mind taking another look? I've significantly restructured the code after the offline discussions. Probably a good place to start are the final versions of `SlotAllocator` and `SlotAssigner` interfaces. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116364006 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java: ## @@ -54,6 +54,18 @@ public interface SlotAllocator { Optional determineParallelism( JobInformation jobInformation, Collection slots); +/** + * Same as {@link #determineParallelism(JobInformation, Collection)} but additionally determine + * assignment of slots to execution slot sharing groups. + */ +default Optional Review Comment: After an offline discussion, I restructured the interfaces (4fa93e3756fad77cf2dbc92bf77614aa39cc28fe..2fc84d83884c1f342d46fec02d961d4f73eb4219). This particular comment is addressed by 4fa93e3756fad77cf2dbc92bf77614aa39cc28fe: ``` Restructure 1/5: change types passed between AdaptiveScheduler and SlotAssigner Slot assignments are computed and consumed by SlotAllocator. This is expressed implicitly by implementing VertexParallelism. This change tries to make that clear, while still allowing to assign slots to something other than slot sharing groups. It does so by: 1. Introduce JobSchedulingPlan, computed and consumed by SlotAllocator. It couples VertexParallelism with slot assignments. 2. Push the polymorphism of state assignments from VertexParallelism to slot assignment target in JobSchedulingPlan. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116362299 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.StreamSupport; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, int score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final int score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public int getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Integer.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +private final Map> locality; +private final Map maxParallelism; + +public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) { +this( +calculateLocalKeyGroups(archivedExecutionGraph), +StreamSupport.stream( + archivedExecutionGraph.getVerticesTopologically().spliterator(), +false) +.collect( +toMap( +ExecutionJobVertex::getJobVertexId, + ExecutionJobVertex::getMaxParallelism))); +} + +public StateLocalitySlotAssigner( +Map> locality, +Map maxParallelism) { +this.locality = locality; +this.maxParallelism = maxParallelism; +} + +@Override +public AssignmentResult assignSlots( +Collection slots, Collection groups) { + +final Map parallelism = new HashMap<>(); +groups.forEach( +group -> +group.getContainedExecutionVertices() +.forEach( +evi -> +parallelism.merge( +
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116362006 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.StreamSupport; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, int score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final int score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public int getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Integer.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +private final Map> locality; +private final Map maxParallelism; + +public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) { +this( +calculateLocalKeyGroups(archivedExecutionGraph), +StreamSupport.stream( + archivedExecutionGraph.getVerticesTopologically().spliterator(), +false) +.collect( +toMap( +ExecutionJobVertex::getJobVertexId, + ExecutionJobVertex::getMaxParallelism))); +} + +public StateLocalitySlotAssigner( +Map> locality, +Map maxParallelism) { +this.locality = locality; +this.maxParallelism = maxParallelism; +} + +@Override +public AssignmentResult assignSlots( +Collection slots, Collection groups) { + +final Map parallelism = new HashMap<>(); +groups.forEach( +group -> +group.getContainedExecutionVertices() +.forEach( +evi -> +parallelism.merge( +
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116360889 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.StreamSupport; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, int score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final int score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public int getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Integer.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +private final Map> locality; +private final Map maxParallelism; + +public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) { +this( +calculateLocalKeyGroups(archivedExecutionGraph), +StreamSupport.stream( + archivedExecutionGraph.getVerticesTopologically().spliterator(), +false) +.collect( +toMap( +ExecutionJobVertex::getJobVertexId, + ExecutionJobVertex::getMaxParallelism))); Review Comment: Fixed by addressing the [comment above](https://github.com/apache/flink/pull/21981#discussion_r1112862609). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116359613 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.StreamSupport; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, int score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final int score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public int getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Integer.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +private final Map> locality; +private final Map maxParallelism; + +public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) { +this( +calculateLocalKeyGroups(archivedExecutionGraph), +StreamSupport.stream( + archivedExecutionGraph.getVerticesTopologically().spliterator(), +false) +.collect( +toMap( +ExecutionJobVertex::getJobVertexId, + ExecutionJobVertex::getMaxParallelism))); +} + +public StateLocalitySlotAssigner( +Map> locality, +Map maxParallelism) { +this.locality = locality; +this.maxParallelism = maxParallelism; +} + +@Override +public AssignmentResult assignSlots( +Collection slots, Collection groups) { + +final Map parallelism = new HashMap<>(); +groups.forEach( +group -> +group.getContainedExecutionVertices() +.forEach( +evi -> +parallelism.merge( +
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116359318 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.StreamSupport; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, int score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final int score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public int getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Integer.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +private final Map> locality; +private final Map maxParallelism; + +public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) { +this( +calculateLocalKeyGroups(archivedExecutionGraph), +StreamSupport.stream( + archivedExecutionGraph.getVerticesTopologically().spliterator(), +false) +.collect( +toMap( +ExecutionJobVertex::getJobVertexId, + ExecutionJobVertex::getMaxParallelism))); +} + +public StateLocalitySlotAssigner( +Map> locality, +Map maxParallelism) { +this.locality = locality; +this.maxParallelism = maxParallelism; +} + +@Override +public AssignmentResult assignSlots( +Collection slots, Collection groups) { + +final Map parallelism = new HashMap<>(); +groups.forEach( +group -> +group.getContainedExecutionVertices() +.forEach( +evi -> +parallelism.merge( +
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116358878 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java: ## @@ -163,7 +165,12 @@ CompletableFuture goToStopWithSavepoint( /** Interface covering transition to the {@link WaitingForResources} state. */ interface ToWaitingForResources extends StateTransitions { -/** Transitions into the {@link WaitingForResources} state. */ +/** + * Transitions into the {@link WaitingForResources} state without {@link ExecutionGraph} + * (e.g. after creation). + */ void goToWaitingForResources(); +/** Transitions into the {@link WaitingForResources} state (e.g. after restarting). */ +void goToWaitingForResources(ExecutionGraph executionGraph); Review Comment: Removed in 3760a182422db9a7b7cabf420f170346617455fa. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116358657 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.StreamSupport; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, int score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final int score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public int getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Integer.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +private final Map> locality; +private final Map maxParallelism; + +public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) { +this( +calculateLocalKeyGroups(archivedExecutionGraph), +StreamSupport.stream( + archivedExecutionGraph.getVerticesTopologically().spliterator(), +false) +.collect( +toMap( +ExecutionJobVertex::getJobVertexId, + ExecutionJobVertex::getMaxParallelism))); +} + +public StateLocalitySlotAssigner( +Map> locality, +Map maxParallelism) { +this.locality = locality; +this.maxParallelism = maxParallelism; +} + +@Override +public AssignmentResult assignSlots( +Collection slots, Collection groups) { + +final Map parallelism = new HashMap<>(); +groups.forEach( +group -> +group.getContainedExecutionVertices() +.forEach( +evi -> +parallelism.merge( +
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116358477 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.StreamSupport; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + +private static class AllocationScore implements Comparable { + +private final String group; +private final AllocationID allocationId; + +public AllocationScore(String group, AllocationID allocationId, int score) { +this.group = group; +this.allocationId = allocationId; +this.score = score; +} + +private final int score; + +public String getGroup() { +return group; +} + +public AllocationID getAllocationId() { +return allocationId; +} + +public int getScore() { +return score; +} + +@Override +public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { +int result = Integer.compare(score, other.score); +if (result != 0) { +return result; +} +result = other.allocationId.compareTo(allocationId); +if (result != 0) { +return result; +} +return other.group.compareTo(group); +} +} + +private final Map> locality; +private final Map maxParallelism; + +public StateLocalitySlotAssigner(ExecutionGraph archivedExecutionGraph) { +this( +calculateLocalKeyGroups(archivedExecutionGraph), +StreamSupport.stream( + archivedExecutionGraph.getVerticesTopologically().spliterator(), +false) +.collect( +toMap( +ExecutionJobVertex::getJobVertexId, + ExecutionJobVertex::getMaxParallelism))); +} + +public StateLocalitySlotAssigner( +Map> locality, +Map maxParallelism) { +this.locality = locality; +this.maxParallelism = maxParallelism; +} + +@Override +public AssignmentResult assignSlots( +Collection slots, Collection groups) { + +final Map parallelism = new HashMap<>(); +groups.forEach( +group -> +group.getContainedExecutionVertices() +.forEach( +evi -> +parallelism.merge( +
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116356436 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ## @@ -794,7 +802,23 @@ public void goToWaitingForResources() { LOG, desiredResources, this.initialResourceAllocationTimeout, -this.resourceStabilizationTimeout)); +this.resourceStabilizationTimeout, +null)); +} + +@Override +public void goToWaitingForResources(ExecutionGraph executionGraph) { Review Comment: I agree, `state.as` seems error-prone to me. Initial implementation had an error related to that (state was never the one expected). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116355057 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java: ## @@ -121,16 +133,13 @@ public Optional determineParallelism( slotSharingGroupParallelism.get( slotSharingGroup.getSlotSharingGroupId())); -final Iterable sharedSlotToVertexAssignment = +final List sharedSlotToVertexAssignment = createExecutionSlotSharingGroups(vertexParallelism); -for (ExecutionSlotSharingGroup executionSlotSharingGroup : -sharedSlotToVertexAssignment) { -final SlotInfo slotInfo = slotIterator.next(); - -assignments.add( -new ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo)); -} +SlotAssigner.AssignmentResult result = +slotAssigner.assignSlots(freeSlots, sharedSlotToVertexAssignment); +assignments.addAll(result.assignments); +freeSlots = result.remainingSlots; Review Comment: Fixed by restructuring the code and going through all the groups in on go: 2fc84d83884c1f342d46fec02d961d4f73eb4219 .. 4fa93e3756fad77cf2dbc92bf77614aa39cc28fe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #21981: [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
rkhachatryan commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1116355057 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java: ## @@ -121,16 +133,13 @@ public Optional determineParallelism( slotSharingGroupParallelism.get( slotSharingGroup.getSlotSharingGroupId())); -final Iterable sharedSlotToVertexAssignment = +final List sharedSlotToVertexAssignment = createExecutionSlotSharingGroups(vertexParallelism); -for (ExecutionSlotSharingGroup executionSlotSharingGroup : -sharedSlotToVertexAssignment) { -final SlotInfo slotInfo = slotIterator.next(); - -assignments.add( -new ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo)); -} +SlotAssigner.AssignmentResult result = +slotAssigner.assignSlots(freeSlots, sharedSlotToVertexAssignment); +assignments.addAll(result.assignments); +freeSlots = result.remainingSlots; Review Comment: Fixed by restructuring the code and going through all the groups in on go: 2fc84d83884c1f342d46fec02d961d4f73eb4219 ..4fa93e3756fad77cf2dbc92bf77614aa39cc28fe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gunnarmorling commented on pull request #21648: [FLINK-30636] [docs]: Typo fix; 'to to' -> 'to'
gunnarmorling commented on PR #21648: URL: https://github.com/apache/flink/pull/21648#issuecomment-1442297939 @rmetzger, could we get this one merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm merged pull request #537: [docs] Use Prometheus factory class in metric reporter examples
mxm merged PR #537: URL: https://github.com/apache/flink-kubernetes-operator/pull/537 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] dannycranmer commented on a diff in pull request #611: Flink 1.15.4
dannycranmer commented on code in PR #611: URL: https://github.com/apache/flink-web/pull/611#discussion_r1116091205 ## docs/content/posts/2023-02-23-release-1.15.4.md: ## @@ -0,0 +1,152 @@ +--- +authors: +- fapaul: null + name: Danny Cranmer +date: "2023-02-23T17:00:00Z" +excerpt: The Apache Flink Community is pleased to announce a bug fix release for Flink + 1.15. +title: Apache Flink 1.15.4 Release Announcement +aliases: +- /news/2023/02/23/release-1.15.4.html +--- + +The Apache Flink Community is pleased to announce the third bug fix release of the Flink 1.15 series. Review Comment: Thanks, i missed that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] snuyanzin commented on a diff in pull request #611: Flink 1.15.4
snuyanzin commented on code in PR #611: URL: https://github.com/apache/flink-web/pull/611#discussion_r1116071377 ## docs/content/posts/2023-02-23-release-1.15.4.md: ## @@ -0,0 +1,152 @@ +--- +authors: +- fapaul: null + name: Danny Cranmer +date: "2023-02-23T17:00:00Z" +excerpt: The Apache Flink Community is pleased to announce a bug fix release for Flink + 1.15. +title: Apache Flink 1.15.4 Release Announcement +aliases: +- /news/2023/02/23/release-1.15.4.html +--- + +The Apache Flink Community is pleased to announce the third bug fix release of the Flink 1.15 series. Review Comment: ```suggestion The Apache Flink Community is pleased to announce the fourth bug fix release of the Flink 1.15 series. ``` According to https://github.com/apache/flink-web/blob/147d1c0d5823006d360d410d771cf55c18d02e53/docs/content/posts/2022-11-10-release-1.15.3.md?plain=1#L13 1.15.3 was the third, or did I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…
snuyanzin commented on code in PR #21993: URL: https://github.com/apache/flink/pull/21993#discussion_r1116007389 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java: ## @@ -31,24 +31,28 @@ import java.util.Optional; -import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast; - /** Specific {@link ArgumentTypeStrategy} for {@link BuiltInFunctionDefinitions#ARRAY_CONTAINS}. */ @Internal class ArrayElementArgumentTypeStrategy implements ArgumentTypeStrategy { +private final boolean preserveNullability; + +public ArrayElementArgumentTypeStrategy(boolean preserveNullability) { +this.preserveNullability = preserveNullability; +} + @Override public Optional inferArgumentType( CallContext callContext, int argumentPos, boolean throwOnFailure) { final ArrayType haystackType = (ArrayType) callContext.getArgumentDataTypes().get(0).getLogicalType(); final LogicalType haystackElementType = haystackType.getElementType(); -final LogicalType needleType = - callContext.getArgumentDataTypes().get(argumentPos).getLogicalType(); -if (supportsImplicitCast(needleType, haystackElementType)) { Review Comment: I didn't get: check for implicit cast should be removed? why? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27954) JobVertexFlameGraphHandler does not work on standby Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-27954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-27954: -- Assignee: Weijie Guo > JobVertexFlameGraphHandler does not work on standby Dispatcher > -- > > Key: FLINK-27954 > URL: https://issues.apache.org/jira/browse/FLINK-27954 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / REST >Affects Versions: 1.15.0 >Reporter: Chesnay Schepler >Assignee: Weijie Guo >Priority: Major > Fix For: 1.17.0, 1.16.2, 1.15.5 > > > The {{JobVertexFlameGraphHandler}} relies internally on the > {{JobVertexThreadInfoTracker}} which calls > {{ResourceManagerGateway#requestTaskExecutorThreadInfoGateway}} to get a > gateway for requesting the thread info from the task executors. Since this > gateway is not serializable it would categorically fail if called from a > standby dispatcher. > Instead this should follow the logic of the {{MetricFetcherImpl}}, which > requests addresses instead and manually connects to the task executors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] mxm opened a new pull request, #537: [docs] Use Prometheus factory class in metric reporter examples
mxm opened a new pull request, #537: URL: https://github.com/apache/flink-kubernetes-operator/pull/537 Using the reporter class directly is not supported anymore with Flink 1.16. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27954) JobVertexFlameGraphHandler does not work on standby Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-27954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692801#comment-17692801 ] Weijie Guo commented on FLINK-27954: Thanks [~chesnay] for reporting this and giving fix suggestion, I'd like to do this. > JobVertexFlameGraphHandler does not work on standby Dispatcher > -- > > Key: FLINK-27954 > URL: https://issues.apache.org/jira/browse/FLINK-27954 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / REST >Affects Versions: 1.15.0 >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.17.0, 1.16.2, 1.15.5 > > > The {{JobVertexFlameGraphHandler}} relies internally on the > {{JobVertexThreadInfoTracker}} which calls > {{ResourceManagerGateway#requestTaskExecutorThreadInfoGateway}} to get a > gateway for requesting the thread info from the task executors. Since this > gateway is not serializable it would categorically fail if called from a > standby dispatcher. > Instead this should follow the logic of the {{MetricFetcherImpl}}, which > requests addresses instead and manually connects to the task executors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…
snuyanzin commented on code in PR #21993: URL: https://github.com/apache/flink/pull/21993#discussion_r1116011807 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java: ## @@ -71,7 +71,11 @@ public final class SpecificInputTypeStrategies { /** Argument type derived from the array element type. */ public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG = -new ArrayElementArgumentTypeStrategy(); +new ArrayElementArgumentTypeStrategy(false); + +/** Argument type derived from the array element type. But leaves nullability untouched. */ Review Comment: that seems not true. Example: array[1, 2, 3] which has nullability `NOT NULL` and `ARRAY_ELEMENT_ARG_NULLABLE` makes it nullable. So it looks like it doesn't "untouched" nullability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…
snuyanzin commented on code in PR #21993: URL: https://github.com/apache/flink/pull/21993#discussion_r1116007389 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java: ## @@ -31,24 +31,28 @@ import java.util.Optional; -import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast; - /** Specific {@link ArgumentTypeStrategy} for {@link BuiltInFunctionDefinitions#ARRAY_CONTAINS}. */ @Internal class ArrayElementArgumentTypeStrategy implements ArgumentTypeStrategy { +private final boolean preserveNullability; + +public ArrayElementArgumentTypeStrategy(boolean preserveNullability) { +this.preserveNullability = preserveNullability; +} + @Override public Optional inferArgumentType( CallContext callContext, int argumentPos, boolean throwOnFailure) { final ArrayType haystackType = (ArrayType) callContext.getArgumentDataTypes().get(0).getLogicalType(); final LogicalType haystackElementType = haystackType.getElementType(); -final LogicalType needleType = - callContext.getArgumentDataTypes().get(argumentPos).getLogicalType(); -if (supportsImplicitCast(needleType, haystackElementType)) { Review Comment: I didn't get: check for implicit cast should be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-31183) Flink Kinesis EFO Consumer can fail to stop gracefully
[ https://issues.apache.org/jira/browse/FLINK-31183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-31183. --- Resolution: Fixed > Flink Kinesis EFO Consumer can fail to stop gracefully > -- > > Key: FLINK-31183 > URL: https://issues.apache.org/jira/browse/FLINK-31183 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.3, 1.16.1, aws-connector-4.0.0 >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: 1.15.4, aws-connector-4.1.0, 1.16.2 > > > *Background* > When stopping a Flink job using the stop-with-savepoint API the EFO Kinesis > source can fail to close gracefully. > > Sample stack trace > {code:java} > 2023-02-16 20:45:40 > org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed. > at > org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name > with subtask : Source: vas_source_stream (38/48)#0 Failure reason: Task has > failed. > at > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395) > at > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338) > at > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) > at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.RejectedExecutionException: event executor terminated > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) > at > java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) > ... 3 more > Caused by: java.util.concurrent.RejectedExecutionException: event executor > terminated > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher$ChannelSubscription.cancel(HandlerPublisher.java:502) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.cancel(DelegatingSubscription.java:37) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription.cancel(Http2ResetSendingSubscription.java:41) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.DelegatingSubscription.cancel(DelegatingSubscription.java:37) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$OnCancelSubscription.cancel(ResponseHandler.java:409) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.async.FlatteningSubscriber$1.cancel
[jira] [Comment Edited] (FLINK-31183) Flink Kinesis EFO Consumer can fail to stop gracefully
[ https://issues.apache.org/jira/browse/FLINK-31183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692651#comment-17692651 ] Danny Cranmer edited comment on FLINK-31183 at 2/23/23 5:05 PM: Merged commit [{{a8c34db}}|https://github.com/apache/flink/commit/a8c34db3d601c534f92e68a2709a6467eb94276e] into apache:release-1.15 Merged commit [{{cd7b049}}|https://github.com/apache/flink/commit/cd7b0495bcdadc3a9808a475be819c9808d5f17e] into apache:release-1.16 Merged commit [{{fdfe982}}|https://github.com/apache/flink-connector-aws/commit/fdfe9821b36027e9afd8db4d32ac8eff080dad2d] into apache:main was (Author: dannycranmer): Merged commit [{{a8c34db}}|https://github.com/apache/flink/commit/a8c34db3d601c534f92e68a2709a6467eb94276e] into apache:release-1.15 Merged commit [{{fdfe982}}|https://github.com/apache/flink-connector-aws/commit/fdfe9821b36027e9afd8db4d32ac8eff080dad2d] into apache:main > Flink Kinesis EFO Consumer can fail to stop gracefully > -- > > Key: FLINK-31183 > URL: https://issues.apache.org/jira/browse/FLINK-31183 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.3, 1.16.1, aws-connector-4.0.0 >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: 1.15.4, aws-connector-4.1.0, 1.16.2 > > > *Background* > When stopping a Flink job using the stop-with-savepoint API the EFO Kinesis > source can fail to close gracefully. > > Sample stack trace > {code:java} > 2023-02-16 20:45:40 > org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed. > at > org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103) > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name > with subtask : Source: vas_source_stream (38/48)#0 Failure reason: Task has > failed. > at > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395) > at > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338) > at > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) > at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.RejectedExecutionException: event executor terminated > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) > at > java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) > ... 3 more > Caused by: java.util.concurrent.RejectedExecutionException: event executor > terminated > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.n
[GitHub] [flink] dannycranmer merged pull request #22008: [FLINK-31183][Connector/Kinesis] Fix bug where EFO Consumer can fail …
dannycranmer merged PR #22008: URL: https://github.com/apache/flink/pull/22008 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase
zhuzhurk commented on code in PR #21963: URL: https://github.com/apache/flink/pull/21963#discussion_r1115923261 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -928,14 +1011,8 @@ private StreamConfig createJobVertex(Integer streamNodeId, OperatorChainInfo cha return new StreamConfig(jobVertex.getConfiguration()); } -private void setVertexConfig( -Integer vertexID, -StreamConfig config, -List chainableOutputs, -List nonChainableOutputs, -Map chainedSources) { - -tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs); +private void setOperatorConfig( +Integer vertexID, StreamConfig config, Map chainedSources) { Review Comment: vertexID -> vertexId ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroup.java: ## @@ -53,9 +55,22 @@ public ForwardGroup(final Set jobVertices) { .map(JobVertex::getParallelism) .collect(Collectors.toSet()); -checkState(decidedParallelisms.size() <= 1); -if (decidedParallelisms.size() == 1) { -this.parallelism = decidedParallelisms.iterator().next(); +checkState(configuredParallelisms.size() <= 1); +if (configuredParallelisms.size() == 1) { +this.parallelism = configuredParallelisms.iterator().next(); +} + +Set configuredMaxParallelisms = +jobVertices.stream() +.map(JobVertex::getMaxParallelism) +.filter(val -> val > 0) +.collect(Collectors.toSet()); + +if (!configuredMaxParallelisms.isEmpty()) { +this.maxParallelism = Collections.min(configuredMaxParallelisms); +checkState( +maxParallelism >= parallelism, Review Comment: The `parallelism` can be -1, while the `maxParallelism` is configured. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -758,6 +771,76 @@ private List createChain( } } +private void setVertexParallelismsForDynamicGraphIfNecessary() { Review Comment: Better to add some comments to explain what this method is for, e.g. ensures the parallelism and maxParallelism of vertices in the same forward group to be the same; set the parallelism at early stage if possible, to avoid invalid partition reuse. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -1955,6 +2091,14 @@ private OperatorID addNodeToChain(int currentNodeId, String operatorName) { return new OperatorID(primaryHashBytes); } +private void setTransitiveOutEdges(final List transitiveOutEdges) { +this.transitiveOutEdges.addAll(transitiveOutEdges); +} + +public List getTransitiveOutEdges() { Review Comment: ```suggestion private List getTransitiveOutEdges() { ``` ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -568,12 +592,7 @@ && isChainableInput(sourceOutEdge, streamGraph)) { final StreamConfig.SourceInputConfig inputConfig = new StreamConfig.SourceInputConfig(sourceOutEdge); final StreamConfig operatorConfig = new StreamConfig(new Configuration()); -setVertexConfig( -sourceNodeId, -operatorConfig, -Collections.emptyList(), -Collections.emptyList(), -Collections.emptyMap()); +setOperatorConfig(sourceNodeId, operatorConfig, Collections.emptyMap()); Review Comment: IIUC, `setChainedOutputs(emptyList)` would be called here previously, but is no longer called after this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org