[GitHub] [flink] XuQianJin-Stars commented on a change in pull request #11186: [FLINK-16200][sql] Support JSON_EXISTS for blink planner
XuQianJin-Stars commented on a change in pull request #11186: [FLINK-16200][sql] Support JSON_EXISTS for blink planner URL: https://github.com/apache/flink/pull/11186#discussion_r403428053 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/JsonFunctionsTest.scala ## @@ -125,4 +127,53 @@ class JsonFunctionsTest extends ExpressionTestBase { } } + @Test + def testJsonExists(): Unit = { +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo' false on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo' true on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo' unknown on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo' false on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo' true on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo' unknown on error)", "true") +testSqlApi("json_exists('{}', " + + "'invalid $.foo' false on error)", "false") +testSqlApi("json_exists('{}', " + + "'invalid $.foo' true on error)", "true") +testSqlApi("json_exists('{}', " + + "'invalid $.foo' unknown on error)", "null") +testSqlApi("json_exists(cast('{\"foo\":\"bar\"}' as varchar), " + + "'strict $.foo1')", "false") + +// not exists +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo1' false on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo1' true on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo1' unknown on error)", "null") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' true on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' false on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' error on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' unknown on error)", "false") + +// nulls +testSqlApi("json_exists(cast(null as varchar), 'lax $' unknown on error)", "null") + } + + @Test + def testJsonFuncError(): Unit = { +expectedException.expect(classOf[CodeGenException]) +expectedException.expectMessage(startsWith("Unsupported call: JSON_EXISTS")) Review comment: > This exception message is still misleading. We already support `JSON_EXISTS`, why the exception says not? I think we should improve the exception to give a better understandable message, e.g. `the json path 'lax $' is illegal.` Because `JSON_EXISTS (INT, CHAR (5) NOT NULL, RAW ('org.apache.calcite.sql.SqlJsonExistsErrorBehavior',?)` This is not supported. The json path 'lax $' is illegal will return `false`. Like `verifyException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuQianJin-Stars commented on a change in pull request #11186: [FLINK-16200][sql] Support JSON_EXISTS for blink planner
XuQianJin-Stars commented on a change in pull request #11186: [FLINK-16200][sql] Support JSON_EXISTS for blink planner URL: https://github.com/apache/flink/pull/11186#discussion_r403428053 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/JsonFunctionsTest.scala ## @@ -125,4 +127,53 @@ class JsonFunctionsTest extends ExpressionTestBase { } } + @Test + def testJsonExists(): Unit = { +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo' false on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo' true on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo' unknown on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo' false on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo' true on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo' unknown on error)", "true") +testSqlApi("json_exists('{}', " + + "'invalid $.foo' false on error)", "false") +testSqlApi("json_exists('{}', " + + "'invalid $.foo' true on error)", "true") +testSqlApi("json_exists('{}', " + + "'invalid $.foo' unknown on error)", "null") +testSqlApi("json_exists(cast('{\"foo\":\"bar\"}' as varchar), " + + "'strict $.foo1')", "false") + +// not exists +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo1' false on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo1' true on error)", "true") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'strict $.foo1' unknown on error)", "null") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' true on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' false on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' error on error)", "false") +testSqlApi("json_exists('{\"foo\":\"bar\"}', " + + "'lax $.foo1' unknown on error)", "false") + +// nulls +testSqlApi("json_exists(cast(null as varchar), 'lax $' unknown on error)", "null") + } + + @Test + def testJsonFuncError(): Unit = { +expectedException.expect(classOf[CodeGenException]) +expectedException.expectMessage(startsWith("Unsupported call: JSON_EXISTS")) Review comment: > This exception message is still misleading. We already support `JSON_EXISTS`, why the exception says not? I think we should improve the exception to give a better understandable message, e.g. `the json path 'lax $' is illegal.` Because `JSON_EXISTS (INT, CHAR (5) NOT NULL, RAW ('org.apache.calcite.sql.SqlJsonExistsErrorBehavior',?)` This is not supported. The json path 'lax $' is illegal will return `false`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed
flinkbot edited a comment on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed URL: https://github.com/apache/flink/pull/11630#issuecomment-608974004 ## CI report: * 9a9073f279a1bc2bb44352199134c372825cb769 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158328185) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7054) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed
flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed URL: https://github.com/apache/flink/pull/11630#issuecomment-608974004 ## CI report: * 9a9073f279a1bc2bb44352199134c372825cb769 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed
flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed URL: https://github.com/apache/flink/pull/11630#issuecomment-608972585 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9a9073f279a1bc2bb44352199134c372825cb769 (Sat Apr 04 04:38:33 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 opened a new pull request #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed
wangyang0918 opened a new pull request #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed URL: https://github.com/apache/flink/pull/11630 ## What is the purpose of the change The pods may be pending because of not enough resources, disk pressure, or other problems. Then wait_rest_endpoint_up will timeout. Describing all resources will help to debug these problems. We still have some failed instances and can not reproduce in the local environment(Mac/Linux). Open this PR to run e2e tests more times to find the root cause. ## Brief change log * Describe all resources so that we could find more information about why the K8s e2e tests failed * Debug log could not show up in sometimes, so move `debug_and_show_logs` before `cleanup` ## Verifying this change * Run e2e tests more times, K8s related tests should pass ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liuzhixing1006 commented on issue #11455: [FLINK-16098] [chinese-translation, Documentation] Translate "Overvie…
liuzhixing1006 commented on issue #11455: [FLINK-16098] [chinese-translation, Documentation] Translate "Overvie… URL: https://github.com/apache/flink/pull/11455#issuecomment-608969822 Hi @wuchong @JingsongLi , It seems that this pr has met the standard, can you help to merge it? Thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart
[ https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17075023#comment-17075023 ] Lu Niu commented on FLINK-16931: Thanks in advance! Will share with the plan later. > Large _metadata file lead to JobManager not responding when restart > --- > > Key: FLINK-16931 > URL: https://issues.apache.org/jira/browse/FLINK-16931 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.9.2, 1.10.0, 1.11.0 >Reporter: Lu Niu >Assignee: Lu Niu >Priority: Critical > Fix For: 1.11.0 > > > When _metadata file is big, JobManager could never recover from checkpoint. > It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is > related log: > {code:java} > 2020-04-01 17:08:25,689 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Recovering checkpoints from ZooKeeper. > 2020-04-01 17:08:25,698 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found > 3 checkpoints in ZooKeeper. > 2020-04-01 17:08:25,698 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Trying to fetch 3 checkpoints from storage. > 2020-04-01 17:08:25,698 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Trying to retrieve checkpoint 50. > 2020-04-01 17:08:48,589 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Trying to retrieve checkpoint 51. > 2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The > heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out. > {code} > Digging into the code, looks like ExecutionGraph::restart runs in JobMaster > main thread and finally calls > ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download > file form DFS. The main thread is basically blocked for a while because of > this. One possible solution is to making the downloading part async. More > things might need to consider as the original change tries to make it > single-threaded. [https://github.com/apache/flink/pull/7568] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16724) ListSerializer cannot serialize list which containers null
[ https://issues.apache.org/jira/browse/FLINK-16724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074991#comment-17074991 ] Congxian Qiu(klion26) commented on FLINK-16724: --- [~nppoly] What do you think about my comment, If you agree with it, could you please close this ticket? PS: FLINK-16916 has been fixed, now you can try to use {{NullableSerialzier}} in the new master codebase. > ListSerializer cannot serialize list which containers null > -- > > Key: FLINK-16724 > URL: https://issues.apache.org/jira/browse/FLINK-16724 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Chongchen Chen >Priority: Major > Attachments: list_serializer_err.diff > > > MapSerializer handles null value correctly, but ListSerializer doesn't. The > attachment is the modification of unit test that can reproduce the bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task. URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084 ## CI report: * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN * 7da912a7e2bd854fec07c6eb6e7784fba30df765 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158315901) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7053) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task. URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084 ## CI report: * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015) * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN * 7da912a7e2bd854fec07c6eb6e7784fba30df765 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158289333) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7051) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158289333) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7051) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566 ## CI report: * c4207aa80f6279d013a51b9104997e840716640e Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158278462) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7049) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047) * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158289333) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7051) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566 ## CI report: * c4207aa80f6279d013a51b9104997e840716640e Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158278462) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7049) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093 ## CI report: * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN * a8335f0def293f04fd74bfc47c1b31fc98a46a23 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158276096) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7048) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047) * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-16710) Log Upload blocks Main Thread in TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-16710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-16710. Resolution: Fixed master: dbf0c4c5914d11b6c1209f089ed014db8cd733cb > Log Upload blocks Main Thread in TaskExecutor > - > > Key: FLINK-16710 > URL: https://issues.apache.org/jira/browse/FLINK-16710 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0 >Reporter: Gary Yao >Assignee: wangsan >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Uploading logs to the BlobServer blocks the TaskExecutor's main thread. We > should introduce an IO thread pool that carries out file system accesses > (listing files in a directory, checking if file exists, uploading files). > Affected RPCs: > * {{TaskExecutor#requestLogList(Time)}} > * {{TaskExecutor#requestFileUploadByName(String, Time)}} > * {{TaskExecutor#requestFileUploadByType(FileType, Time)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] GJL closed pull request #11571: [FLINK-16710][runtime] Log Upload blocks Main Thread in TaskExecutor
GJL closed pull request #11571: [FLINK-16710][runtime] Log Upload blocks Main Thread in TaskExecutor URL: https://github.com/apache/flink/pull/11571 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Reopened] (FLINK-16921) "kubernetes session test" is unstable
[ https://issues.apache.org/jira/browse/FLINK-16921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reopened FLINK-16921: The test failed again: https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7046=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > "kubernetes session test" is unstable > - > > Key: FLINK-16921 > URL: https://issues.apache.org/jira/browse/FLINK-16921 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, Tests >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Assignee: Yang Wang >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > CI: > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6915=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > I assume some services didn't come up? > {code} > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection refused: /10.1.0.4:30095 > {code} > Full log > {code} > 2020-04-01T09:13:59.0673858Z Successfully built ba628fa7af0d > 2020-04-01T09:13:59.0726818Z Successfully tagged > test_kubernetes_session:latest > 2020-04-01T09:13:59.2547709Z > clusterrolebinding.rbac.authorization.k8s.io/flink-role-binding-default > created > 2020-04-01T09:14:00.0586087Z 2020-04-01 09:14:00,055 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.address, localhost > 2020-04-01T09:14:00.0608876Z 2020-04-01 09:14:00,060 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.port, 6123 > 2020-04-01T09:14:00.0611236Z 2020-04-01 09:14:00,060 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.heap.size, 1024m > 2020-04-01T09:14:00.0613869Z 2020-04-01 09:14:00,061 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.memory.process.size, 1728m > 2020-04-01T09:14:00.0616344Z 2020-04-01 09:14:00,061 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2020-04-01T09:14:00.0619384Z 2020-04-01 09:14:00,061 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: parallelism.default, 1 > 2020-04-01T09:14:00.0624467Z 2020-04-01 09:14:00,062 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.execution.failover-strategy, region > 2020-04-01T09:14:00.9838038Z 2020-04-01 09:14:00,983 INFO > org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The > derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is > less than its min value 192.000mb (201326592 bytes), min value will be used > instead > 2020-04-01T09:14:00.9922554Z 2020-04-01 09:14:00,991 INFO > org.apache.flink.kubernetes.utils.KubernetesUtils[] - Kubernetes > deployment requires a fixed port. Configuration blob.server.port will be set > to 6124 > 2020-04-01T09:14:00.9927409Z 2020-04-01 09:14:00,992 INFO > org.apache.flink.kubernetes.utils.KubernetesUtils[] - Kubernetes > deployment requires a fixed port. Configuration taskmanager.rpc.port will be > set to 6122 > 2020-04-01T09:14:01.0587014Z 2020-04-01 09:14:01,058 WARN > org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] > - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop > Configuration ConfigMap. > 2020-04-01T09:14:01.0592498Z 2020-04-01 09:14:01,059 WARN > org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] > - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop > Configuration ConfigMap. > 2020-04-01T09:14:01.8684880Z 2020-04-01 09:14:01,868 INFO > org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create > flink session cluster flink-native-k8s-session-1 successfully, JobManager Web > Interface: http://10.1.0.4:30095 > 2020-04-01T09:14:03.2952029Z Executing WordCount example with default input > data set. > 2020-04-01T09:14:03.2955365Z Use --input to specify file input. > 2020-04-01T09:15:31.5606577Z > 2020-04-01T09:15:31.5610358Z > > 2020-04-01T09:15:31.5610913Z The program finished with the following > exception: > 2020-04-01T09:15:31.564Z > 2020-04-01T09:15:31.5611772Z >
[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566 ## CI report: * 668797635ab529ea21ef234a1f99747cfb4d898a Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158015561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7006) * c4207aa80f6279d013a51b9104997e840716640e Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158278462) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7049) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093 ## CI report: * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN * 49d5d3e1e8e1bf962716241b79db1f2e28e506b4 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158188400) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7040) * a8335f0def293f04fd74bfc47c1b31fc98a46a23 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158276096) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7048) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566 ## CI report: * 668797635ab529ea21ef234a1f99747cfb4d898a Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158015561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7006) * c4207aa80f6279d013a51b9104997e840716640e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093 ## CI report: * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN * 49d5d3e1e8e1bf962716241b79db1f2e28e506b4 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158188400) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7040) * a8335f0def293f04fd74bfc47c1b31fc98a46a23 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403225047 ## File path: docs/concepts/distributed_architecture.md ## @@ -0,0 +1,99 @@ +--- +title: Distributed Architecture +nav-id: dist-arch +nav-pos: 2 Review comment: I think this should come after the logical functions page, what do you think? ```suggestion nav-pos: 3 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on a change in pull request #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
Myasuka commented on a change in pull request #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider URL: https://github.com/apache/flink/pull/11624#discussion_r403207707 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java ## @@ -258,6 +258,10 @@ protected OperatorStateBackend operatorStateBackend( } } + protected TtlTimeProvider getTtlTimeProvider() { Review comment: There existed another problem why we not change the constructor of `StreamTaskStateInitializerImpl`. Current `AbstractStreamOperatorTestHarness` is not created from a builder, and once a new `AbstractStreamOperatorTestHarness` is created, the inner `streamTaskStateInitializer` has been created with the default `TtlTimeProvider`. Even we set ttl time provider to `AbstractStreamOperatorTestHarness` later, the inner `streamTaskStateInitializer` would not notice the changed ttl time provider unless we call `AbstractStreamOperatorTestHarness#setup` to re-create the inner `streamTaskStateInitializer`. However, `AbstractStreamOperatorTestHarness#setup` actually call a deprecated `SetupableStreamOperator#setup` interface. In a nutshell, unless we refactor how we build `AbstractStreamOperatorTestHarness`, to make the customized ttl time provider take effect, we must call `AbstractStreamOperatorTestHarness#setup` each time which might already be treated as a deprecated interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403193426 ## File path: docs/concepts/distributed_architecture.md ## @@ -0,0 +1,99 @@ +--- +title: Distributed Architecture +nav-id: dist-arch +nav-pos: 2 +nav-title: Architecture +nav-parent_id: concepts +--- + + +A Stateful Functions deployment consists of a few components interacting together. Here we describe these pieces and their Review comment: You're missing the second half of this sentence. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403198004 ## File path: docs/concepts/distributed_architecture.md ## @@ -0,0 +1,99 @@ +--- +title: Distributed Architecture +nav-id: dist-arch +nav-pos: 2 +nav-title: Architecture +nav-parent_id: concepts +--- + + +A Stateful Functions deployment consists of a few components interacting together. Here we describe these pieces and their + +* This will be replaced by the TOC +{:toc} + +## High-level View + +A *Stateful Functions* deployment consists of a set of **Apache Flink Stateful Functions** processes and, optionally, various deployments that execute remote functions. + + + + + +The Flink worker processes (TaskManagers) receive the events from the ingress systems (Kafka, Kinesis, etc.) and route them to the target functions. They invoke the functions, and route the resulting mesages to the next respective target functions. Messages designated for egress are written to an egress system (again, Kafka, Kinesis, ...). Review comment: ```suggestion The Flink worker processes (TaskManagers) receive the events from the ingress systems (Kafka, Kinesis, etc.) and route them to the target functions. They invoke the functions and route the resulting messages to the next respective target functions. Messages designated for egress are written to an egress system (again, Kafka, Kinesis, ...). ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403196811 ## File path: docs/concepts/distributed_architecture.md ## @@ -0,0 +1,99 @@ +--- +title: Distributed Architecture +nav-id: dist-arch +nav-pos: 2 +nav-title: Architecture +nav-parent_id: concepts +--- + + +A Stateful Functions deployment consists of a few components interacting together. Here we describe these pieces and their + +* This will be replaced by the TOC +{:toc} + +## High-level View + +A *Stateful Functions* deployment consists of a set of **Apache Flink Stateful Functions** processes and, optionally, various deployments that execute remote functions. + + + + + +The Flink worker processes (TaskManagers) receive the events from the ingress systems (Kafka, Kinesis, etc.) and route them to the target functions. They invoke the functions, and route the resulting mesages to the next respective target functions. Messages designated for egress are written to an egress system (again, Kafka, Kinesis, ...). + +## Components + +The heavy lifting is done by the Apache Flink processes, which manage the state, handle the messaging, and invoke the stateful functions. +The Flink cluster consists typically of one master and multiple workers (TaskManagers). + + + + + +In addition to the Apache Flink processes, a full deployment requires [ZooKeeper](https://zookeeper.apache.org/) (for [master failover](https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html)) and bulk storage (S3, HDFS, NAS, GCS, Azure Blob Store, etc.) to store Flink's [checkpoints](https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html#checkpointing). In turn, the deployment requires no database, and Flink processes do not require persistent volumes. + +## Logical Co-location, Physical Separation + +A core principle of many Stream Processors is that application logic and the application state must be co-located. That approach is the basis for their out-of-the box consistency. Stateful Function takes a unique approach to that by *logically co-locating* state and compute, but allowing to *physically separate* them. + + - *Logical co-location:* Messaging, state access/updates and function invocations are managed tightly together, in the same way as in Flink's DataStream API. State is sharded by key, and messages are routed to the state by key. There is a single writer per key at a time, also scheduling the function invocations. + + - *Physical separation:* Functions can be executed remotely, with message and state access provided as part of the invocation request. This way, functions can be managed independently, like stateless processes. + + +## Deployment Styles for Functions + +The stateful functions themselves can be deployed in various ways that trade off certain properties with each other: loose coupling and independent scaling on the one hand with performance overhead on the other hand. Each module of functions can be of a different kind, so some functions can run remote, while others could run embedded. + + Remote Functions + +*Remote Functions* use the above-mentioned principle of *physical separation* while maintaining *logical co-location*. The state/messaging tier (i.e., the Flink processes) and the function tier are deployed, managed, and scaled independently. + +Function invocations happen through an HTTP / gRPC protocol and go through a service that routes invocation requests to any available endpoint, for example a Kubernetes (load-balancing) service, the AWS request gateway for Lambda, etc. Because invocations are self-contained (contain message, state, access to timers, etc.) the target functions can treated like any stateless application. + + + + + + +Refer to the documentation on the [Python SDK]({{ site.baseurl }}/sdk/python.html) and [remote modules]({{ site.baseurl }}/sdk/modules.html#remote-module) for details. + + Co-located Functions + +An alternative way of deploying functions is *co-location* with the Flink JVM processes. In such a setup, each Flink TaskManager would talk to one Function process sitting *"next to it"*. A common way to do this is to use a system like Kubernetes and deploy pods consisting of a Flink container and the function side-car container; the two communicate via the pod-local network. + +This mode supports different languages while avoiding to route invocations through a Service/LoadBalancer, but it cannot scale the state and compute parts independently. + + + + + +This style of deployment is similar to how Flink's Table API and API Beam's portability layer and deploy execute non-JVM functions. Review comment: ```suggestion This style of deployment is similar to how Flink's Table API and API Beam's portability
[GitHub] [flink] bowenli86 commented on a change in pull request #11538: [FLINK-16813][jdbc] JDBCInputFormat doesn't correctly map Short
bowenli86 commented on a change in pull request #11538: [FLINK-16813][jdbc] JDBCInputFormat doesn't correctly map Short URL: https://github.com/apache/flink/pull/11538#discussion_r403184113 ## File path: flink-core/src/main/java/org/apache/flink/util/StringUtils.java ## @@ -139,8 +139,16 @@ public static String arrayToString(Object array) { if (array instanceof long[]) { return Arrays.toString((long[]) array); } - if (array instanceof Object[]) { - return Arrays.toString((Object[]) array); + // for array of byte array + if (array instanceof byte[][]) { Review comment: maybe it should be added later, I haven't seen a case yet. We need byte[][] as pg supports array of byte array. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#issuecomment-599949839 ## CI report: * e5e11418358bf450d1cca543916bbd7d695375b1 UNKNOWN * ad481f5d846621032feb21e409690eed5b114191 UNKNOWN * 2a76c7689c032056a462a615c28f27f1361f1f0e Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158236401) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7045) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16978) Update flink-statefun README.md for Stateful Functions 2.0.
Marta Paes Moreira created FLINK-16978: -- Summary: Update flink-statefun README.md for Stateful Functions 2.0. Key: FLINK-16978 URL: https://issues.apache.org/jira/browse/FLINK-16978 Project: Flink Issue Type: Task Components: Stateful Functions Reporter: Marta Paes Moreira Assignee: Marta Paes Moreira Updating the README in the repository to fit the changes of Stateful Functions 2.0. Will also update the README in the statefun-python-sdk directory. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-9173) RestClient - Received response is abnormal
[ https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074738#comment-17074738 ] François Lacombe edited comment on FLINK-9173 at 4/3/20, 5:06 PM: -- This issue is still alive with Flink 1.10 and Springboot 2.2.5. I had to deal with it recently and colleagues of mine find out this workdaround: Root cause comes from wrong classloader selection due to some SpringBoot packaging flaw. In my pom.xml, I replaced : {code:java} org.springframework.boot spring-boot-maven-plugin {code} by: {code:java} org.apache.maven.plugins maven-dependency-plugin copy-dependencies prepare-package copy-dependencies ${project.build.directory}/libs org.apache.maven.plugins maven-jar-plugin true libs/ org.baeldung.executable.ExecutableMavenJar {code} {{}} It will produce a jar file with libs in a side directry instead of a fat-jar. Hope it will help and eventually be fixed in a next release All the best was (Author: flacombe): This issue is still alive with Flink 1.10 and Springboot 2.2.5. I had to deal with it recently and colleagues of mine find out this workdaround: Root cause comes from wrong classloader selection due to some SpringBoot packaging flaw. In my pom.xml, I replaced : {code:java} org.springframework.boot spring-boot-maven-plugin {code} by: {{}} {code:java} org.apache.maven.plugins maven-dependency-plugin copy-dependencies prepare-package copy-dependencies ${project.build.directory}/libs org.apache.maven.plugins maven-jar-plugin true libs/ org.baeldung.executable.ExecutableMavenJar {code} {{}} It will produce a jar file with libs in a side directry instead of a fat-jar. Hope it will help and eventually be fixed in a next release All the best > RestClient - Received response is abnormal > -- > > Key: FLINK-9173 > URL: https://issues.apache.org/jira/browse/FLINK-9173 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.5.0 > Environment: OS: CentOS 6.8 > JAVA: 1.8.0_161-b12 > maven-plugin: spring-boot-maven-plugin > Spring-boot: 1.5.10.RELEASE >Reporter: Bob Lau >Priority: Major > Attachments: image-2018-04-17-14-09-33-991.png > > > The system prints the exception log as follows: > > {code:java} > //代码占位符 > 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR > o.a.flink.runtime.rest.RestClient - Received response was neither of the > expected type ([simple type, class > org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) > nor an error. > Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: > Unrecognized field "status" (class > org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as > ignorable (one known property: "errors"]) > at [Source: N/A; line: -1, column: -1] (through reference chain: > org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) > at >
[jira] [Commented] (FLINK-9173) RestClient - Received response is abnormal
[ https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074738#comment-17074738 ] François Lacombe commented on FLINK-9173: - This issue is still alive with Flink 1.10 and Springboot 2.2.5. I had to deal with it recently and colleagues of mine find out this workdaround: Root cause comes from wrong classloader selection due to some SpringBoot packaging flaw. In my pom.xml, I replaced : {code:java} org.springframework.boot spring-boot-maven-plugin {code} by: {{}} {code:java} org.apache.maven.plugins maven-dependency-plugin copy-dependencies prepare-package copy-dependencies ${project.build.directory}/libs org.apache.maven.plugins maven-jar-plugin true libs/ org.baeldung.executable.ExecutableMavenJar {code} {{}} It will produce a jar file with libs in a side directry instead of a fat-jar. Hope it will help and eventually be fixed in a next release All the best > RestClient - Received response is abnormal > -- > > Key: FLINK-9173 > URL: https://issues.apache.org/jira/browse/FLINK-9173 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.5.0 > Environment: OS: CentOS 6.8 > JAVA: 1.8.0_161-b12 > maven-plugin: spring-boot-maven-plugin > Spring-boot: 1.5.10.RELEASE >Reporter: Bob Lau >Priority: Major > Attachments: image-2018-04-17-14-09-33-991.png > > > The system prints the exception log as follows: > > {code:java} > //代码占位符 > 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR > o.a.flink.runtime.rest.RestClient - Received response was neither of the > expected type ([simple type, class > org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) > nor an error. > Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: > Unrecognized field "status" (class > org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as > ignorable (one known property: "errors"]) > at [Source: N/A; line: -1, column: -1] (through reference chain: > org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547) > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > In the development
[GitHub] [flink-statefun] StephanEwen opened a new pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture
StephanEwen opened a new pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture URL: https://github.com/apache/flink-statefun/pull/95 This adds documentation for the components of a distributed deployment. ![image](https://user-images.githubusercontent.com/1727146/78386265-8b173e80-75dd-11ea-9dfb-856f29279159.png) ![image](https://user-images.githubusercontent.com/1727146/78386302-a124ff00-75dd-11ea-9726-a6e292b26079.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16977) Add Architecture docs for Stateful Functions
[ https://issues.apache.org/jira/browse/FLINK-16977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16977: --- Labels: pull-request-available (was: ) > Add Architecture docs for Stateful Functions > > > Key: FLINK-16977 > URL: https://issues.apache.org/jira/browse/FLINK-16977 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 2.1.0 > > > We should add a section to the documentation describing the distributed setup > and architecture of a Stateful Functions applications. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16977) Add Architecture docs for Stateful Functions
Stephan Ewen created FLINK-16977: Summary: Add Architecture docs for Stateful Functions Key: FLINK-16977 URL: https://issues.apache.org/jira/browse/FLINK-16977 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 2.1.0, 2.0.0 We should add a section to the documentation describing the distributed setup and architecture of a Stateful Functions applications. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * f128770ed2a5816a1f460734f736fd07b273e896 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043) * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403089669 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int requiredTaskManagers = getPendingWorkerNums().get(workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.getNum(workerResourceSpec); Review comment: I think I would hide `pendingWorkerCounter` behind some methods which the base class provides. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403128803 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -540,39 +571,41 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfy pending slot requests. */ - private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredTaskManagers(); - - if (requiredTaskManagers > numPendingContainerRequests) { - requestYarnContainer(); - } + private void requestYarnContainerIfRequired(Resource containerResource) { + getPendingWorkerNums().entrySet().stream() + .filter(entry -> + getContainerResource(entry.getKey()).equals(containerResource) && + entry.getValue() > pendingWorkerCounter.getNum(entry.getKey())) + .findAny() Review comment: Shouldn't we do this for all instead of any? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403115740 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainerStatus.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +/** + * A {@link ContainerStatus} implementation for testing. + */ +class TestingContainerStatus extends ContainerStatus { + + private final ContainerId containerId; + private final ContainerState containerState; + private final String diagnostics; + private final int exitStatus; + + TestingContainerStatus( + final ContainerId containerId, + final ContainerState containerState, + final String diagnostics, + final int exitStatus) { + + this.containerId = containerId; + this.containerState = containerState; + this.diagnostics = diagnostics; + this.exitStatus = exitStatus; + } + + @Override + public ContainerId getContainerId() { + return containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + Review comment: Shouldn't we fail in case someone calls `setContainerId` here? Otherwise it might go unnoticed and result in some strange behaviour/failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403121941 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; + +import java.util.function.Consumer; + +/** + * A Yarn {@link NMClientAsync} implementation for testing. + */ +class TestingYarnNMClientAsync extends NMClientAsyncImpl { + + private Consumer> startContainerAsyncConsumer = ignored -> {}; + private Consumer> stopContainerAsyncConsumer = ignored -> {}; Review comment: I'd suggest to use the `TriConsumer` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403132471 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -540,39 +571,41 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfy pending slot requests. */ - private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredTaskManagers(); - - if (requiredTaskManagers > numPendingContainerRequests) { - requestYarnContainer(); - } + private void requestYarnContainerIfRequired(Resource containerResource) { Review comment: Shouldn't we iterate over all resources which are needed instead of restricting it to `containerResource`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403133265 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactoryTest.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.entrypoint; + +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.junit.Test; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link YarnResourceManagerFactory}. + */ +public class YarnResourceManagerFactoryTest { Review comment: ```suggestion public class YarnResourceManagerFactoryTest extends TestLogger { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403090988 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int requiredTaskManagers = getPendingWorkerNums().get(workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.getNum(workerResourceSpec); - if (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + if (requiredTaskManagers > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); - final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(new ResourceID(pod.getName())); - if (kubernetesWorkerNode != null) { - requestKubernetesPodIfRequired(); + final WorkerResourceSpec workerResourceSpec = removeWorkerNodeAndResourceSpec(new ResourceID(pod.getName())); + if (workerResourceSpec != null) { + requestKubernetesPodIfRequired(workerResourceSpec);
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403118223 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { Review comment: `AMRMClientAsyncImpl` is annotated as unstable. I'm not sure how will the testing implementation works across different Yarn versions. Have we tried this out? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403105830 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; Review comment: If we are interested in every `WorkerResourceSpec` which ever resulted into a given `Resource`, then I would suggest to change the value type to `List`. Otherwise one could instantiate this field with a `Set` which has different semantics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403113398 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainer.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; + +/** + * A {@link Container} implementation for testing. + */ +class TestingContainer extends Container { Review comment: I like the idea of getting rid of Mockito but how do we ensure that this works with all Hadoop versions? Looking at Hadoop 2.10. https://github.com/apache/hadoop/blob/release-2.10.0-RC1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java it looks as if the container has gotten some more methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403088133 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: Should we add a check state to ensure that we fail in case that we request a different size? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403078672 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerTest.java ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests for {@link ActiveResourceManager}. + */ +public class ActiveResourceManagerTest { Review comment: ```suggestion public class ActiveResourceManagerTest extends TestLogger { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403126015 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -363,31 +356,64 @@ public void onContainersCompleted(final List statuses) { @Override public void onContainersAllocated(List containers) { runAsync(() -> { - log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests); - final Collection pendingRequests = getPendingRequests(); - final Iterator pendingRequestsIterator = pendingRequests.iterator(); + log.info("Received {} containers.", containers.size()); - // number of allocated containers can be larger than the number of pending container requests - final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests); - final List requiredContainers = containers.subList(0, numAcceptedContainers); - final List excessContainers = containers.subList(numAcceptedContainers, containers.size()); - - for (int i = 0; i < requiredContainers.size(); i++) { - removeContainerRequest(pendingRequestsIterator.next()); - } - - excessContainers.forEach(this::returnExcessContainer); - requiredContainers.forEach(this::startTaskExecutorInContainer); + groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated); // if we are waiting for no further containers, we can go to the // regular heartbeat interval - if (numPendingContainerRequests <= 0) { + if (pendingWorkerCounter.getTotalNum() <= 0) { resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); } }); } - private void startTaskExecutorInContainer(Container container) { + private Map> groupContainerByResource(List containers) { + return containers.stream().collect(Collectors.groupingBy(Container::getResource)); + } + + private void onContainersOfResourceAllocated(Resource resource, List containers) { + final List pendingWorkerResourceSpecs = + workerSpecContainerResourceAdapter.getWorkerSpecs(resource).stream() Review comment: Can it happen that `getWorkerSpecs(resource)` returns a list which contains a `WorkerResourceSpec` twice? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403099591 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final boolean matchVcores) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.matchVcores = matchVcores; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Resource getContainerResource(final WorkerResourceSpec workerResourceSpec) { + return workerSpecToContainerResource.computeIfAbsent( + Preconditions.checkNotNull(workerResourceSpec), + this::createAndMapContainerResource); + } + + @VisibleForTesting + Collection getWorkerSpecs(final Resource containerResource) { + return getEquivalentContainerResource(containerResource).stream() + .flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptyList()).stream()) + .collect(Collectors.toList()); + } + + @VisibleForTesting + Collection getEquivalentContainerResource(final Resource containerResource) { + // Yarn might ignore the requested vcores, depending on its configurations. + // In such cases, we should also not matching vcores. + return matchVcores ? + Collections.singletonList(containerResource) : + containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), Collections.emptyList()); + } + + private Resource createAndMapContainerResource(final WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: I would suggest to add a check state to ensure that we fail once we enable dynamic worker resources. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403130352 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -540,39 +571,41 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfy pending slot requests. */ - private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredTaskManagers(); - - if (requiredTaskManagers > numPendingContainerRequests) { - requestYarnContainer(); - } + private void requestYarnContainerIfRequired(Resource containerResource) { + getPendingWorkerNums().entrySet().stream() Review comment: I have to admit that I find `getPendingWorkerNums()` and `pendingWorkerCounter` quite confusing. The sound almost the same but the former means the requirements of the `SlotManager` and the latter the currently pending workers which have been requested by the RM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403104873 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final boolean matchVcores) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.matchVcores = matchVcores; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Resource getContainerResource(final WorkerResourceSpec workerResourceSpec) { + return workerSpecToContainerResource.computeIfAbsent( + Preconditions.checkNotNull(workerResourceSpec), + this::createAndMapContainerResource); + } + + @VisibleForTesting + Collection getWorkerSpecs(final Resource containerResource) { + return getEquivalentContainerResource(containerResource).stream() + .flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptyList()).stream()) + .collect(Collectors.toList()); + } + + @VisibleForTesting + Collection getEquivalentContainerResource(final Resource containerResource) { Review comment: If we are only interested in the equivalence class, then I would suggest to change the type from `Collection` to `Set`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403086625 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode worker) { @Override public void onAdded(List pods) { runAsync(() -> { - for (KubernetesPod pod : pods) { - if (numPendingPodRequests > 0) { - numPendingPodRequests--; + pods.forEach(pod -> { + WorkerResourceSpec workerResourceSpec = podWorkerResources.get(pod.getName()); + final int pendingNum = pendingWorkerCounter.getNum(workerResourceSpec); + if (pendingNum > 0) { + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: I'm wondering whether this logic shouldn't go into the `ActiveResourceManager`. I would expect that all `ActiveResourceManager` implementations would need to do something similar. Maybe we could introduce `notifyNewWorkerStarted(WorkerResourceSpec)`. This could also have the benefit that we could hide `pendingWorkerCounter` completely. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403097275 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { Review comment: I think this class is large enough to warrant its own file. This would also decrease the size of this source code file a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403131968 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -412,30 +439,32 @@ private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId final ResourceID resourceId = new ResourceID(containerId.toString()); // release the failed container - workerNodeMap.remove(resourceId); + YarnWorkerNode yarnWorkerNode = workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(containerId); // and ask for a new one - requestYarnContainerIfRequired(); + requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource()); } private void returnExcessContainer(Container excessContainer) { log.info("Returning excess container {}.", excessContainer.getId()); resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); } - private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { - numPendingContainerRequests--; - - log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests); - + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) { + log.info("Removing container request {}.", pendingContainerRequest); + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: Differently asked, when do we clean `workerSpecContainerResourceAdapter` up? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403092773 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -176,6 +185,15 @@ MainThreadExecutor getMainThreadExecutorForTesting() { SlotManager getSlotManager() { return this.slotManager; } + + @Override + public Map getPendingWorkerNums() { + return customPendingWorkerNums != null ? customPendingWorkerNums : super.getPendingWorkerNums(); + } + + public void setCustomPendingWorkerNums(final Map customPendingWorkerNums) { + this.customPendingWorkerNums = customPendingWorkerNums; + } Review comment: I think this is pretty much whitebox testing as it strongly relies on internal implementation details. I would recommend to go another way and to rely either on the public APIs of the component or to encapsulate the bookkeeping logic so that it can be tested separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403123867 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -363,31 +356,64 @@ public void onContainersCompleted(final List statuses) { @Override public void onContainersAllocated(List containers) { runAsync(() -> { - log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests); - final Collection pendingRequests = getPendingRequests(); - final Iterator pendingRequestsIterator = pendingRequests.iterator(); + log.info("Received {} containers.", containers.size()); - // number of allocated containers can be larger than the number of pending container requests - final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests); - final List requiredContainers = containers.subList(0, numAcceptedContainers); - final List excessContainers = containers.subList(numAcceptedContainers, containers.size()); - - for (int i = 0; i < requiredContainers.size(); i++) { - removeContainerRequest(pendingRequestsIterator.next()); - } - - excessContainers.forEach(this::returnExcessContainer); - requiredContainers.forEach(this::startTaskExecutorInContainer); + groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated); Review comment: I'd suggest to use the for-each loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403088473 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: Btw: where do we change the `Configuration`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403127546 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -412,30 +439,32 @@ private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId final ResourceID resourceId = new ResourceID(containerId.toString()); // release the failed container - workerNodeMap.remove(resourceId); + YarnWorkerNode yarnWorkerNode = workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(containerId); // and ask for a new one - requestYarnContainerIfRequired(); + requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource()); } private void returnExcessContainer(Container excessContainer) { log.info("Returning excess container {}.", excessContainer.getId()); resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); } - private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { - numPendingContainerRequests--; - - log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests); - + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) { + log.info("Removing container request {}.", pendingContainerRequest); + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: What about removing it from `workerSpecContainerResourceAdapter`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403107046 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java ## @@ -34,4 +37,24 @@ .stringType() .noDefaultValue() .withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j."); + + /** +* **DO NO USE** Whether {@link YarnResourceManager} should match the vcores of allocated containers with those requested. +* +* By default, Yarn ignores vcores in the container requests, and always allocate 1 vcore for each container. +* Iff 'yarn.scheduler.capacity.resource-calculator' is set to 'DominantResourceCalculator' for Yarn, will it +* allocate container vcores as requested. Unfortunately, this configuration option is dedicated for Yarn Scheduler, +* and is only accessible to applications in Hadoop 2.6+. +* +* ATM, it should be fine to not match vcores, because with the current {@link SlotManagerImpl} all the TM +* containers should have the same resources. +* +* If later we add another {@link SlotManager} implementation that may have TMs with different resources, we can +* switch this option on only for the new SM, and the new SM can also be available on Hadoop 2.6+ only. +*/ + public static final ConfigOption MATCH_CONTAINER_VCORES = + key("$internal.yarn.resourcemanager.enable-vcore-matching") + .booleanType() + .defaultValue(false) + .withDescription("**DO NOT USE** Whether YarnResourceManager should match the container vcores."); Review comment: Does this mean that one has to configure ones Flink cluster depending on the configuration of the Yarn cluster? What happens if one forgets about Flink? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403108543 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -584,6 +590,85 @@ public void testGetCpuExceedMaxInt() throws Exception { }}; } + @Test + public void testWorkerSpecContainerResourceAdapter_MatchVcores() { + final int minMemMB = 100; + final int minVcore = 10; + final YarnResourceManager.WorkerSpecContainerResourceAdapter adapter = + new YarnResourceManager.WorkerSpecContainerResourceAdapter( + getConfigProcessSpecEqualsWorkerSpec(), minMemMB, minVcore, true); Review comment: It is usually easier to understand if one use an enum instead of boolean because one can give the different values expressive names (e.g. `MATCH_VCORES`, `IGNORE_VCORES`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403118602 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { + + private Function, List>> + getMatchingRequestsFunction = ignored -> Collections.emptyList(); + private Consumer> addContainerRequestConsumer = ignored -> {}; + private Consumer> removeContainerRequestConsumer = ignored -> {}; + private Consumer> releaseAssignedContainerConsumer = ignored -> {}; Review comment: I'd suggest to use the `BiConsumer` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403132911 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactoryTest.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.entrypoint; + +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; + +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link KubernetesResourceManagerFactory}. + */ +public class KubernetesResourceManagerFactoryTest { Review comment: ```suggestion public class KubernetesResourceManagerFactoryTest extends TestLogger { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403093997 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -321,6 +339,47 @@ public void testGetCpuCoresNumSlots() { assertThat(resourceManager.getCpuCores(configuration), is(3.0)); } + @Test + public void testStartAndRecoverVariousResourceSpec() { + // Start two workers with different resources + final WorkerResourceSpec workerResourceSpec1 = new WorkerResourceSpec(1.0, 100, 0, 100, 100); + final WorkerResourceSpec workerResourceSpec2 = new WorkerResourceSpec(1.0, 99, 0, 100, 100); + resourceManager.startNewWorker(workerResourceSpec1); + resourceManager.startNewWorker(workerResourceSpec2); + + // Verify two pods with both worker resources are started + final PodList initialPodList = kubeClient.pods().list(); + assertEquals(2, initialPodList.getItems().size()); + final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20)); + + // Notify resource manager about pods added. + final KubernetesPod initialKubernetesPod1 = new KubernetesPod(initialPod1); + final KubernetesPod initialKubernetesPod2 = new KubernetesPod(initialPod2); + resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, initialKubernetesPod2)); + + // Terminate pod1. + terminatePod(initialPod1); + resourceManager.setCustomPendingWorkerNums(Collections.singletonMap(workerResourceSpec1, 1)); Review comment: I would recommend to not test the component like this. It requires detailed knowledge of the component's internals and makes it harder to evolve it because this test relies on the fact that the `KubernetesResourceManager` has a map of `WorkerResourceSpec` to `Integers`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403083789 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode worker) { @Override public void onAdded(List pods) { runAsync(() -> { - for (KubernetesPod pod : pods) { - if (numPendingPodRequests > 0) { - numPendingPodRequests--; + pods.forEach(pod -> { Review comment: Call me old fashioned, but I think the for-each loop `for (KubernetesPod pod: pods)` is superior to `forEach`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403121671 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; + +import java.util.function.Consumer; + +/** + * A Yarn {@link NMClientAsync} implementation for testing. + */ +class TestingYarnNMClientAsync extends NMClientAsyncImpl { Review comment: Same here `NMClientAsyncImpl` seems to be unstable and might change depending on the used Yarn version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-16670) Support User-Defined Metrics in Python UDF
[ https://issues.apache.org/jira/browse/FLINK-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-16670. --- Resolution: Resolved > Support User-Defined Metrics in Python UDF > --- > > Key: FLINK-16670 > URL: https://issues.apache.org/jira/browse/FLINK-16670 > Project: Flink > Issue Type: New Feature > Components: API / Python >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.11.0 > > > This is the umbrella Jira for FLIP112, which intends to support User-Defined > Metrics in Python UDF. > FLIP wiki page: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-112%3A+Support+User-Defined+Metrics+in++Python+UDF] > FLIP-58 adds the support for Python UDFs, but user-defined metrics have not > been supported yet. With metrics, users can report and monitor the UDF status > to get a deeper understanding of the execution. In this FLIP, we want to > support metrics for Python UDFs. In this FLIP we propose to: > * Support user-defined metrics including Counters, Gauges, Meters, > Distributions in Python UDFs. (Note: Histogram is not supported in this FLIP, > instead, Distributions is supported to report statistics about the > distribution of value. See more in the Distribution section.) > * Support defining user scopes. > * Support defining user variables. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * f128770ed2a5816a1f460734f736fd07b273e896 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043) * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-15836) Start a new pods watcher in KubernetesResourceManager when the old one is closed with exception
[ https://issues.apache.org/jira/browse/FLINK-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang updated FLINK-15836: -- Fix Version/s: (was: 1.10.1) > Start a new pods watcher in KubernetesResourceManager when the old one is > closed with exception > --- > > Key: FLINK-15836 > URL: https://issues.apache.org/jira/browse/FLINK-15836 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes >Reporter: Yang Wang >Assignee: Yang Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As the discussion in the PR[1], if the {{watchReconnectLimit}} is configured > by users via java properties or environment, the watch may be stopped and all > the changes will not be processed properly. So we need to start a new pods > watcher in {{KubernetesResourceManager}} when the old one is closed with > exception. > [1]. [https://github.com/apache/flink/pull/10965#discussion_r373491974] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11628: [FLINK-14504][rest] Add Cluster DataSet REST API
flinkbot edited a comment on issue #11628: [FLINK-14504][rest] Add Cluster DataSet REST API URL: https://github.com/apache/flink/pull/11628#issuecomment-608370153 ## CI report: * 44066c510cd12ae93baf839652b1bb2b43cc8cab Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158181648) Azure: [CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7038) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * f128770ed2a5816a1f460734f736fd07b273e896 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-16045) Extract connectors documentation to a top-level section
[ https://issues.apache.org/jira/browse/FLINK-16045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-16045. Fix Version/s: 1.11.0 Resolution: Fixed master: c3251f5a0f7e0b2dc9240aa94221ca860b0f1988 > Extract connectors documentation to a top-level section > --- > > Key: FLINK-16045 > URL: https://issues.apache.org/jira/browse/FLINK-16045 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.11.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-16044) Extract libraries documentation to a top-level section
[ https://issues.apache.org/jira/browse/FLINK-16044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-16044. Fix Version/s: 1.11.0 Resolution: Fixed master: e25f0d3c322982291005369da8a5a7cf62d4e59a > Extract libraries documentation to a top-level section > -- > > Key: FLINK-16044 > URL: https://issues.apache.org/jira/browse/FLINK-16044 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] aljoscha closed pull request #11367: [FLINK-16044] / [FLINK-16045] Make libraries/connectors documentation top-level sections
aljoscha closed pull request #11367: [FLINK-16044] / [FLINK-16045] Make libraries/connectors documentation top-level sections URL: https://github.com/apache/flink/pull/11367 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16976) Update chinese documentation for ListCheckpointed deprecation
Aljoscha Krettek created FLINK-16976: Summary: Update chinese documentation for ListCheckpointed deprecation Key: FLINK-16976 URL: https://issues.apache.org/jira/browse/FLINK-16976 Project: Flink Issue Type: Bug Components: chinese-translation, Documentation Reporter: Aljoscha Krettek Fix For: 1.11.0 The change for the english documentation is in https://github.com/apache/flink/commit/10aadfc6906a1629f7e60eacf087e351ba40d517 The original Jira issue is FLINK-6258. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#issuecomment-599949839 ## CI report: * e5e11418358bf450d1cca543916bbd7d695375b1 UNKNOWN * ad481f5d846621032feb21e409690eed5b114191 UNKNOWN * 76235a9c945643047057064bda8ad49d3e90568c Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158210509) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7041) * 2a76c7689c032056a462a615c28f27f1361f1f0e Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158236401) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7045) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11629: [FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for filesystem table sink.
flinkbot edited a comment on issue #11629: [FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for filesystem table sink. URL: https://github.com/apache/flink/pull/11629#issuecomment-608466138 ## CI report: * 5a3f35f15271416d41248df519aa8d3ac1bbe5ea Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158224123) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7042) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11627: [FLINK-16770][e2e] Make the checkpoint resuming e2e case pass by increasing the…
flinkbot edited a comment on issue #11627: [FLINK-16770][e2e] Make the checkpoint resuming e2e case pass by increasing the… URL: https://github.com/apache/flink/pull/11627#issuecomment-608340311 ## CI report: * 59717320f953e6c4d3cfefe64c830fafee4d2c40 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158169993) Azure: [CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7037) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#issuecomment-599949839 ## CI report: * e5e11418358bf450d1cca543916bbd7d695375b1 UNKNOWN * ad481f5d846621032feb21e409690eed5b114191 UNKNOWN * 76235a9c945643047057064bda8ad49d3e90568c Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158210509) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7041) * 2a76c7689c032056a462a615c28f27f1361f1f0e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on issue #11571: [FLINK-16710][runtime] Log Upload blocks Main Thread in TaskExecutor
GJL commented on issue #11571: [FLINK-16710][runtime] Log Upload blocks Main Thread in TaskExecutor URL: https://github.com/apache/flink/pull/11571#issuecomment-608502360 Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous
wangyang0918 commented on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous URL: https://github.com/apache/flink/pull/11427#issuecomment-608499803 @azagrebin Thanks for your review. I have addressed most of your comments. Since some others are not applicable when we only set `createTaskManagerPod` and `stopPod` as asynchronous. Would you please take a look again at your convenience? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-6258) Deprecate ListCheckpointed interface
[ https://issues.apache.org/jira/browse/FLINK-6258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-6258. --- Fix Version/s: 1.11.0 Release Note: The `ListCheckpointed` interface has been deprecated because it uses Java Serialization for checkpointing state which is problematic for savepoint compatibility. Use the `CheckpointedFunction` interface instead, which gives more control over state serialization. Resolution: Fixed master: 10aadfc6906a1629f7e60eacf087e351ba40d517 > Deprecate ListCheckpointed interface > > > Key: FLINK-6258 > URL: https://issues.apache.org/jira/browse/FLINK-6258 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Per discussion in https://github.com/apache/flink/pull/3508, we consider > deprecating the `ListCheckpointed` interface to discourage Java serialization > shortcuts for state registrations (towards this, the Java serialization > shortcuts provided by the `OperatorStateStore` interface have already been > deprecated in https://github.com/apache/flink/pull/3508). > We should also remember to update > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state > if we decide to do this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] aljoscha closed pull request #11618: [FLINK-6258] Deprecate ListCheckpointed interface
aljoscha closed pull request #11618: [FLINK-6258] Deprecate ListCheckpointed interface URL: https://github.com/apache/flink/pull/11618 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11629: [FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for filesystem table sink.
flinkbot edited a comment on issue #11629: [FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for filesystem table sink. URL: https://github.com/apache/flink/pull/11629#issuecomment-608466138 ## CI report: * 5a3f35f15271416d41248df519aa8d3ac1bbe5ea Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158224123) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7042) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052 ## CI report: * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN * 48c7c6996d88457bc4ab2cf0f9a8060c76499cfd Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158007513) Azure: [CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7005) * f128770ed2a5816a1f460734f736fd07b273e896 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on issue #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#issuecomment-608491962 I think the last part of my comment was answered by #11353. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403037228 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -153,6 +155,8 @@ public SlotManagerImpl( this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease; this.defaultWorkerResourceSpec = defaultWorkerResourceSpec; this.numSlotsPerWorker = numSlotsPerWorker; + this.defaultSlotResourceProfile = defaultWorkerResourceSpec != null ? + generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker) : null; Review comment: Instead of setting `defaultSlotResourceProfile` to `null`, can't we set it to `ResourceProfile.UNKNOWN` or something like this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403037763 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -807,23 +811,32 @@ private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) return false; } - private Optional allocateResource(ResourceProfile resourceProfile) { - final Collection requestedSlots = resourceActions.allocateResource(resourceProfile); + private Optional allocateResource(ResourceProfile requestedSlotResourceProfile) { + if (defaultWorkerResourceSpec == null) { + // standalone mode, cannot allocate resource + return Optional.empty(); + } Review comment: I think here we are leaking details from the enclosing component into the `SlotManager`. I think the `SlotManager` should not decide for the `ResourceManager` whether it can allocate new resources or not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403060510 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1051,13 +1050,13 @@ protected abstract void internalDeregisterApplication( @Nullable String optionalDiagnostics) throws ResourceManagerException; /** -* Allocates a resource using the resource profile. +* Allocates a resource using the worker resource specification. * -* @param resourceProfile The resource description +* @param workerResourceSpec that describes the to be allocated worker. Review comment: ```suggestion * @param workerResourceSpec that describes the to be allocated worker ``` param, return or throws tags are usually not terminated by a period. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403048662 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java ## @@ -52,6 +53,9 @@ */ public class SlotManagerFailUnfulfillableTest extends TestLogger { + private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new WorkerResourceSpec( + 100.0, 1, 1, 1, 1); Review comment: Looking at this line, I would recommend introducing a builder for the `WorkerResourceSpec` and to make the default constructor private. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403053905 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) { // Resource Management // - protected int getNumberRequiredTaskManagerSlots() { - return slotManager.getNumberPendingTaskManagerSlots(); + protected int getNumberRequiredTaskManagers() { + return getPendingWorkerNums().values().stream().reduce(0, Integer::sum); Review comment: For the time being I would add a check state which ensures that the `WorkerResourceSpecs` can all be fulfilled by the container size we are using to start new containers/pods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403032569 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -133,14 +133,17 @@ @Nullable private final WorkerResourceSpec defaultWorkerResourceSpec; + private final int numSlotsPerWorker; + public SlotManagerImpl( SlotMatchingStrategy slotMatchingStrategy, ScheduledExecutor scheduledExecutor, Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout, boolean waitResultConsumedBeforeRelease, - @Nullable WorkerResourceSpec defaultWorkerResourceSpec) { + @Nullable WorkerResourceSpec defaultWorkerResourceSpec, + int numSlotsPerWorker) { Review comment: Maybe it would make sense to pass in the `SlotManagerConfiguration`. If this is possible, then one would not have to add another field to the constructor every time we add a new configuration parameter for the `SlotManager`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403054549 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -207,6 +208,18 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) { } @Override + public Map getPendingWorkerNums() { + final int pendingWorkerNum = (int) Math.ceil((double) pendingSlots.size() / numSlotsPerWorker); + if (pendingWorkerNum > 0) { + return Collections.singletonMap( + Preconditions.checkNotNull(defaultWorkerResourceSpec, + "There should never be pending slots/workers in standalone mode."), Review comment: I think this should not be necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403054142 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ## @@ -51,7 +53,7 @@ int getNumberFreeSlotsOf(InstanceID instanceId); - int getNumberPendingTaskManagerSlots(); + Map getPendingWorkerNums(); Review comment: `JavaDoc` missing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403056960 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -156,8 +156,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public boolean startNewWorker(ResourceProfile resourceProfile) { - LOG.info("Starting new worker with resource profile, {}", resourceProfile); + public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) { + LOG.info("Starting new worker with worker resource spec, {}", workerResourceSpec); requestKubernetesPod(); Review comment: I guess that we need a check that the configured pod fulfills `workerResourceSpec`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services