[GitHub] [flink] XuQianJin-Stars commented on a change in pull request #11186: [FLINK-16200][sql] Support JSON_EXISTS for blink planner

2020-04-03 Thread GitBox
XuQianJin-Stars commented on a change in pull request #11186: 
[FLINK-16200][sql] Support JSON_EXISTS for blink planner
URL: https://github.com/apache/flink/pull/11186#discussion_r403428053
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/JsonFunctionsTest.scala
 ##
 @@ -125,4 +127,53 @@ class JsonFunctionsTest extends ExpressionTestBase {
 }
   }
 
+  @Test
+  def testJsonExists(): Unit = {
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo' false on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo' true on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo' unknown on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo' false on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo' true on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo' unknown on error)", "true")
+testSqlApi("json_exists('{}', "
+  + "'invalid $.foo' false on error)", "false")
+testSqlApi("json_exists('{}', "
+  + "'invalid $.foo' true on error)", "true")
+testSqlApi("json_exists('{}', "
+  + "'invalid $.foo' unknown on error)", "null")
+testSqlApi("json_exists(cast('{\"foo\":\"bar\"}' as varchar), "
+  + "'strict $.foo1')", "false")
+
+// not exists
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo1' false on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo1' true on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo1' unknown on error)", "null")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' true on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' false on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' error on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' unknown on error)", "false")
+
+// nulls
+testSqlApi("json_exists(cast(null as varchar), 'lax $' unknown on error)", 
"null")
+  }
+
+  @Test
+  def testJsonFuncError(): Unit = {
+expectedException.expect(classOf[CodeGenException])
+expectedException.expectMessage(startsWith("Unsupported call: 
JSON_EXISTS"))
 
 Review comment:
   > This exception message is still misleading. We already support 
`JSON_EXISTS`, why the exception says not? I think we should improve the 
exception to give a better understandable message, e.g. `the json path 'lax $' 
is illegal.`
   
   Because `JSON_EXISTS (INT, CHAR (5) NOT NULL, RAW 
('org.apache.calcite.sql.SqlJsonExistsErrorBehavior',?)` This is not supported. 
The json path 'lax $' is illegal will return `false`. Like `verifyException`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuQianJin-Stars commented on a change in pull request #11186: [FLINK-16200][sql] Support JSON_EXISTS for blink planner

2020-04-03 Thread GitBox
XuQianJin-Stars commented on a change in pull request #11186: 
[FLINK-16200][sql] Support JSON_EXISTS for blink planner
URL: https://github.com/apache/flink/pull/11186#discussion_r403428053
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/JsonFunctionsTest.scala
 ##
 @@ -125,4 +127,53 @@ class JsonFunctionsTest extends ExpressionTestBase {
 }
   }
 
+  @Test
+  def testJsonExists(): Unit = {
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo' false on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo' true on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo' unknown on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo' false on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo' true on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo' unknown on error)", "true")
+testSqlApi("json_exists('{}', "
+  + "'invalid $.foo' false on error)", "false")
+testSqlApi("json_exists('{}', "
+  + "'invalid $.foo' true on error)", "true")
+testSqlApi("json_exists('{}', "
+  + "'invalid $.foo' unknown on error)", "null")
+testSqlApi("json_exists(cast('{\"foo\":\"bar\"}' as varchar), "
+  + "'strict $.foo1')", "false")
+
+// not exists
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo1' false on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo1' true on error)", "true")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'strict $.foo1' unknown on error)", "null")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' true on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' false on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' error on error)", "false")
+testSqlApi("json_exists('{\"foo\":\"bar\"}', "
+  + "'lax $.foo1' unknown on error)", "false")
+
+// nulls
+testSqlApi("json_exists(cast(null as varchar), 'lax $' unknown on error)", 
"null")
+  }
+
+  @Test
+  def testJsonFuncError(): Unit = {
+expectedException.expect(classOf[CodeGenException])
+expectedException.expectMessage(startsWith("Unsupported call: 
JSON_EXISTS"))
 
 Review comment:
   > This exception message is still misleading. We already support 
`JSON_EXISTS`, why the exception says not? I think we should improve the 
exception to give a better understandable message, e.g. `the json path 'lax $' 
is illegal.`
   
   Because `JSON_EXISTS (INT, CHAR (5) NOT NULL, RAW 
('org.apache.calcite.sql.SqlJsonExistsErrorBehavior',?)` This is not supported. 
The json path 'lax $' is illegal will return `false`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11630: [FLINK-16921][e2e] Describe all 
resources and show pods logs before cleanup when failed
URL: https://github.com/apache/flink/pull/11630#issuecomment-608974004
 
 
   
   ## CI report:
   
   * 9a9073f279a1bc2bb44352199134c372825cb769 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158328185) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7054)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed

2020-04-03 Thread GitBox
flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources 
and show pods logs before cleanup when failed
URL: https://github.com/apache/flink/pull/11630#issuecomment-608974004
 
 
   
   ## CI report:
   
   * 9a9073f279a1bc2bb44352199134c372825cb769 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed

2020-04-03 Thread GitBox
flinkbot commented on issue #11630: [FLINK-16921][e2e] Describe all resources 
and show pods logs before cleanup when failed
URL: https://github.com/apache/flink/pull/11630#issuecomment-608972585
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9a9073f279a1bc2bb44352199134c372825cb769 (Sat Apr 04 
04:38:33 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 opened a new pull request #11630: [FLINK-16921][e2e] Describe all resources and show pods logs before cleanup when failed

2020-04-03 Thread GitBox
wangyang0918 opened a new pull request #11630: [FLINK-16921][e2e] Describe all 
resources and show pods logs before cleanup when failed
URL: https://github.com/apache/flink/pull/11630
 
 
   
   
   ## What is the purpose of the change
   
   The pods may be pending because of not enough resources, disk pressure, or 
other problems. Then wait_rest_endpoint_up will timeout. Describing all 
resources will help to debug these problems.
   
   We still have some failed instances and can not reproduce in the local 
environment(Mac/Linux). Open this PR to run e2e tests more times to find the 
root cause.
   
   ## Brief change log
   * Describe all resources so that we could find more information about why 
the K8s e2e tests failed
   * Debug log could not show up in sometimes, so move `debug_and_show_logs` 
before `cleanup`
   
   
   ## Verifying this change
   
   * Run e2e tests more times, K8s related tests should pass
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liuzhixing1006 commented on issue #11455: [FLINK-16098] [chinese-translation, Documentation] Translate "Overvie…

2020-04-03 Thread GitBox
liuzhixing1006 commented on issue #11455: [FLINK-16098] [chinese-translation, 
Documentation] Translate "Overvie…
URL: https://github.com/apache/flink/pull/11455#issuecomment-608969822
 
 
   Hi @wuchong @JingsongLi , It seems that this pr has met the standard, can 
you help to merge it? Thank you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16931) Large _metadata file lead to JobManager not responding when restart

2020-04-03 Thread Lu Niu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17075023#comment-17075023
 ] 

Lu Niu commented on FLINK-16931:


Thanks in advance! Will share with the plan later. 

> Large _metadata file lead to JobManager not responding when restart
> ---
>
> Key: FLINK-16931
> URL: https://issues.apache.org/jira/browse/FLINK-16931
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0, 1.11.0
>Reporter: Lu Niu
>Assignee: Lu Niu
>Priority: Critical
> Fix For: 1.11.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16724) ListSerializer cannot serialize list which containers null

2020-04-03 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074991#comment-17074991
 ] 

Congxian Qiu(klion26) commented on FLINK-16724:
---

[~nppoly] What do you think about my comment, If you agree with it, could you 
please close this ticket?

PS: FLINK-16916 has been fixed, now you can try to use {{NullableSerialzier}} 
in the new master codebase.

> ListSerializer cannot serialize list which containers null
> --
>
> Key: FLINK-16724
> URL: https://issues.apache.org/jira/browse/FLINK-16724
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Chongchen Chen
>Priority: Major
> Attachments: list_serializer_err.diff
>
>
> MapSerializer handles null value correctly, but ListSerializer doesn't. The 
> attachment is the modification of unit test that can reproduce the bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime 
metric for task.
URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084
 
 
   
   ## CI report:
   
   * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN
   * 7da912a7e2bd854fec07c6eb6e7784fba30df765 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158315901) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7053)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime metric for task.

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11564: [FLINK-16864][metrics] Add IdleTime 
metric for task.
URL: https://github.com/apache/flink/pull/11564#issuecomment-605967084
 
 
   
   ## CI report:
   
   * b709e1952df66cba5a316f9a46902538bf8cf245 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158128040) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7015)
 
   * dfc6f9642a2fe6fca383707a11d53ef6ed2ea381 UNKNOWN
   * 7da912a7e2bd854fec07c6eb6e7784fba30df765 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158289333) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7051)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158289333) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7051)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance 
AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566
 
 
   
   ## CI report:
   
   * c4207aa80f6279d013a51b9104997e840716640e Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158278462) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7049)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047)
 
   * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158289333) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7051)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance 
AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566
 
 
   
   ## CI report:
   
   * c4207aa80f6279d013a51b9104997e840716640e Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158278462) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7049)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093
 
 
   
   ## CI report:
   
   * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN
   * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN
   * a8335f0def293f04fd74bfc47c1b31fc98a46a23 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158276096) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7048)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047)
 
   * 07040ddd9344e4c6cf8e6a7d095cdf7471ebdf2e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16710) Log Upload blocks Main Thread in TaskExecutor

2020-04-03 Thread Gary Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-16710.

Resolution: Fixed

master: dbf0c4c5914d11b6c1209f089ed014db8cd733cb

> Log Upload blocks Main Thread in TaskExecutor
> -
>
> Key: FLINK-16710
> URL: https://issues.apache.org/jira/browse/FLINK-16710
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Assignee: wangsan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Uploading logs to the BlobServer blocks the TaskExecutor's main thread. We 
> should introduce an IO thread pool that carries out file system accesses 
> (listing files in a directory, checking if file exists, uploading files).
> Affected RPCs:
>  * {{TaskExecutor#requestLogList(Time)}}
>  * {{TaskExecutor#requestFileUploadByName(String, Time)}}
>  * {{TaskExecutor#requestFileUploadByType(FileType, Time)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] GJL closed pull request #11571: [FLINK-16710][runtime] Log Upload blocks Main Thread in TaskExecutor

2020-04-03 Thread GitBox
GJL closed pull request #11571: [FLINK-16710][runtime] Log Upload blocks Main 
Thread in TaskExecutor
URL: https://github.com/apache/flink/pull/11571
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-16921) "kubernetes session test" is unstable

2020-04-03 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reopened FLINK-16921:


The test failed again: 
https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7046=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5

> "kubernetes session test" is unstable
> -
>
> Key: FLINK-16921
> URL: https://issues.apache.org/jira/browse/FLINK-16921
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> CI: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6915=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> I assume some services didn't come up?
> {code}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>  Connection refused: /10.1.0.4:30095
> {code}
> Full log
> {code}
> 2020-04-01T09:13:59.0673858Z Successfully built ba628fa7af0d
> 2020-04-01T09:13:59.0726818Z Successfully tagged 
> test_kubernetes_session:latest
> 2020-04-01T09:13:59.2547709Z 
> clusterrolebinding.rbac.authorization.k8s.io/flink-role-binding-default 
> created
> 2020-04-01T09:14:00.0586087Z 2020-04-01 09:14:00,055 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2020-04-01T09:14:00.0608876Z 2020-04-01 09:14:00,060 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2020-04-01T09:14:00.0611236Z 2020-04-01 09:14:00,060 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.heap.size, 1024m
> 2020-04-01T09:14:00.0613869Z 2020-04-01 09:14:00,061 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-04-01T09:14:00.0616344Z 2020-04-01 09:14:00,061 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-04-01T09:14:00.0619384Z 2020-04-01 09:14:00,061 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: parallelism.default, 1
> 2020-04-01T09:14:00.0624467Z 2020-04-01 09:14:00,062 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-04-01T09:14:00.9838038Z 2020-04-01 09:14:00,983 INFO  
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The 
> derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is 
> less than its min value 192.000mb (201326592 bytes), min value will be used 
> instead
> 2020-04-01T09:14:00.9922554Z 2020-04-01 09:14:00,991 INFO  
> org.apache.flink.kubernetes.utils.KubernetesUtils[] - Kubernetes 
> deployment requires a fixed port. Configuration blob.server.port will be set 
> to 6124
> 2020-04-01T09:14:00.9927409Z 2020-04-01 09:14:00,992 INFO  
> org.apache.flink.kubernetes.utils.KubernetesUtils[] - Kubernetes 
> deployment requires a fixed port. Configuration taskmanager.rpc.port will be 
> set to 6122
> 2020-04-01T09:14:01.0587014Z 2020-04-01 09:14:01,058 WARN  
> org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] 
> - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop 
> Configuration ConfigMap.
> 2020-04-01T09:14:01.0592498Z 2020-04-01 09:14:01,059 WARN  
> org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] 
> - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop 
> Configuration ConfigMap.
> 2020-04-01T09:14:01.8684880Z 2020-04-01 09:14:01,868 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create 
> flink session cluster flink-native-k8s-session-1 successfully, JobManager Web 
> Interface: http://10.1.0.4:30095
> 2020-04-01T09:14:03.2952029Z Executing WordCount example with default input 
> data set.
> 2020-04-01T09:14:03.2955365Z Use --input to specify file input.
> 2020-04-01T09:15:31.5606577Z 
> 2020-04-01T09:15:31.5610358Z 
> 
> 2020-04-01T09:15:31.5610913Z  The program finished with the following 
> exception:
> 2020-04-01T09:15:31.564Z 
> 2020-04-01T09:15:31.5611772Z 
> 

[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance 
AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566
 
 
   
   ## CI report:
   
   * 668797635ab529ea21ef234a1f99747cfb4d898a Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158015561) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7006)
 
   * c4207aa80f6279d013a51b9104997e840716640e Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158278462) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7049)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093
 
 
   
   ## CI report:
   
   * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN
   * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN
   * 49d5d3e1e8e1bf962716241b79db1f2e28e506b4 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158188400) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7040)
 
   * a8335f0def293f04fd74bfc47c1b31fc98a46a23 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158276096) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7048)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11624: [FLINK-16949] Enhance 
AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
URL: https://github.com/apache/flink/pull/11624#issuecomment-608048566
 
 
   
   ## CI report:
   
   * 668797635ab529ea21ef234a1f99747cfb4d898a Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158015561) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7006)
 
   * c4207aa80f6279d013a51b9104997e840716640e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093
 
 
   
   ## CI report:
   
   * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN
   * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN
   * 49d5d3e1e8e1bf962716241b79db1f2e28e506b4 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158188400) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7040)
 
   * a8335f0def293f04fd74bfc47c1b31fc98a46a23 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture

2020-04-03 Thread GitBox
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add 
docs for components and distributed architecture
URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403225047
 
 

 ##
 File path: docs/concepts/distributed_architecture.md
 ##
 @@ -0,0 +1,99 @@
+---
+title: Distributed Architecture 
+nav-id: dist-arch
+nav-pos: 2
 
 Review comment:
   I think this should come after the logical functions page, what do you think?
   ```suggestion
   nav-pos: 3
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #11624: [FLINK-16949] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider

2020-04-03 Thread GitBox
Myasuka commented on a change in pull request #11624: [FLINK-16949] Enhance 
AbstractStreamOperatorTestHarness to use customized TtlTimeProvider
URL: https://github.com/apache/flink/pull/11624#discussion_r403207707
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 ##
 @@ -258,6 +258,10 @@ protected OperatorStateBackend operatorStateBackend(
}
}
 
+   protected TtlTimeProvider getTtlTimeProvider() {
 
 Review comment:
   There existed another problem why we not change the constructor of 
`StreamTaskStateInitializerImpl`.
   Current `AbstractStreamOperatorTestHarness` is not created from a builder, 
and once a new `AbstractStreamOperatorTestHarness` is created, the inner 
`streamTaskStateInitializer` has been created with the default 
`TtlTimeProvider`. Even we set ttl time provider to 
`AbstractStreamOperatorTestHarness` later, the inner 
`streamTaskStateInitializer` would not notice the changed ttl time provider 
unless we call `AbstractStreamOperatorTestHarness#setup` to re-create the inner 
`streamTaskStateInitializer`. 
   However, `AbstractStreamOperatorTestHarness#setup` actually call a 
deprecated `SetupableStreamOperator#setup` interface. 
   In a nutshell, unless we refactor how we build 
`AbstractStreamOperatorTestHarness`, to make the customized ttl time provider 
take effect, we must call `AbstractStreamOperatorTestHarness#setup` each time 
which might already be treated as a deprecated interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture

2020-04-03 Thread GitBox
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add 
docs for components and distributed architecture
URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403193426
 
 

 ##
 File path: docs/concepts/distributed_architecture.md
 ##
 @@ -0,0 +1,99 @@
+---
+title: Distributed Architecture 
+nav-id: dist-arch
+nav-pos: 2
+nav-title: Architecture
+nav-parent_id: concepts
+---
+
+
+A Stateful Functions deployment consists of a few components interacting 
together. Here we describe these pieces and their 
 
 Review comment:
   You're missing the second half of this sentence. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture

2020-04-03 Thread GitBox
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add 
docs for components and distributed architecture
URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403198004
 
 

 ##
 File path: docs/concepts/distributed_architecture.md
 ##
 @@ -0,0 +1,99 @@
+---
+title: Distributed Architecture 
+nav-id: dist-arch
+nav-pos: 2
+nav-title: Architecture
+nav-parent_id: concepts
+---
+
+
+A Stateful Functions deployment consists of a few components interacting 
together. Here we describe these pieces and their 
+
+* This will be replaced by the TOC
+{:toc}
+
+## High-level View
+
+A *Stateful Functions* deployment consists of a set of **Apache Flink Stateful 
Functions** processes and, optionally, various deployments that execute remote 
functions.
+
+
+
+
+
+The Flink worker processes (TaskManagers) receive the events from the ingress 
systems (Kafka, Kinesis, etc.) and route them to the target functions. They 
invoke the functions, and route the resulting mesages to the next respective 
target functions. Messages designated for egress are written to an egress 
system (again, Kafka, Kinesis, ...).
 
 Review comment:
   ```suggestion
   The Flink worker processes (TaskManagers) receive the events from the 
ingress systems (Kafka, Kinesis, etc.) and route them to the target functions. 
They invoke the functions and route the resulting messages to the next 
respective target functions. Messages designated for egress are written to an 
egress system (again, Kafka, Kinesis, ...).
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture

2020-04-03 Thread GitBox
sjwiesman commented on a change in pull request #95: [FLINK-16977][docs] Add 
docs for components and distributed architecture
URL: https://github.com/apache/flink-statefun/pull/95#discussion_r403196811
 
 

 ##
 File path: docs/concepts/distributed_architecture.md
 ##
 @@ -0,0 +1,99 @@
+---
+title: Distributed Architecture 
+nav-id: dist-arch
+nav-pos: 2
+nav-title: Architecture
+nav-parent_id: concepts
+---
+
+
+A Stateful Functions deployment consists of a few components interacting 
together. Here we describe these pieces and their 
+
+* This will be replaced by the TOC
+{:toc}
+
+## High-level View
+
+A *Stateful Functions* deployment consists of a set of **Apache Flink Stateful 
Functions** processes and, optionally, various deployments that execute remote 
functions.
+
+
+
+
+
+The Flink worker processes (TaskManagers) receive the events from the ingress 
systems (Kafka, Kinesis, etc.) and route them to the target functions. They 
invoke the functions, and route the resulting mesages to the next respective 
target functions. Messages designated for egress are written to an egress 
system (again, Kafka, Kinesis, ...).
+
+## Components
+
+The heavy lifting is done by the Apache Flink processes, which manage the 
state, handle the messaging, and invoke the stateful functions.
+The Flink cluster consists typically of one master and multiple workers 
(TaskManagers).
+
+
+
+
+
+In addition to the Apache Flink processes, a full deployment requires 
[ZooKeeper](https://zookeeper.apache.org/) (for [master 
failover](https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html))
 and bulk storage (S3, HDFS, NAS, GCS, Azure Blob Store, etc.) to store Flink's 
[checkpoints](https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html#checkpointing).
 In turn, the deployment requires no database, and Flink processes do not 
require persistent volumes.
+
+## Logical Co-location, Physical Separation
+
+A core principle of many Stream Processors is that application logic and the 
application state must be co-located. That approach is the basis for their 
out-of-the box consistency. Stateful Function takes a unique approach to that 
by *logically co-locating* state and compute, but allowing to *physically 
separate* them.
+
+  - *Logical co-location:* Messaging, state access/updates and function 
invocations are managed tightly together, in the same way as in Flink's 
DataStream API. State is sharded by key, and messages are routed to the state 
by key. There is a single writer per key at a time, also scheduling the 
function invocations.
+
+  - *Physical separation:* Functions can be executed remotely, with message 
and state access provided as part of the invocation request. This way, 
functions can be managed independently, like stateless processes.
+
+
+## Deployment Styles for Functions
+
+The stateful functions themselves can be deployed in various ways that trade 
off certain properties with each other: loose coupling and independent scaling 
on the one hand with performance overhead on the other hand. Each module of 
functions can be of a different kind, so some functions can run remote, while 
others could run embedded.
+
+ Remote Functions
+
+*Remote Functions* use the above-mentioned principle of *physical separation* 
while maintaining *logical co-location*. The state/messaging tier (i.e., the 
Flink processes) and the function tier are deployed, managed, and scaled 
independently.
+
+Function invocations happen through an HTTP / gRPC protocol and go through a 
service that routes invocation requests to any available endpoint, for example 
a Kubernetes (load-balancing) service, the AWS request gateway for Lambda, etc. 
Because invocations are self-contained (contain message, state, access to 
timers, etc.) the target functions can treated like any stateless application.
+
+
+   
+
+
+
+Refer to the documentation on the [Python SDK]({{ site.baseurl 
}}/sdk/python.html) and [remote modules]({{ site.baseurl 
}}/sdk/modules.html#remote-module) for details. 
+
+ Co-located Functions
+
+An alternative way of deploying functions is *co-location* with the Flink JVM 
processes. In such a setup, each Flink TaskManager would talk to one Function 
process sitting *"next to it"*. A common way to do this is to use a system like 
Kubernetes and deploy pods consisting of a Flink container and the function 
side-car container; the two communicate via the pod-local network.
+
+This mode supports different languages while avoiding to route invocations 
through a Service/LoadBalancer, but it cannot scale the state and compute parts 
independently.
+
+
+   
+
+
+This style of deployment is similar to how Flink's Table API and API Beam's 
portability layer and deploy execute non-JVM functions.
 
 Review comment:
   ```suggestion
   This style of deployment is similar to how Flink's Table API and API Beam's 
portability 

[GitHub] [flink] bowenli86 commented on a change in pull request #11538: [FLINK-16813][jdbc] JDBCInputFormat doesn't correctly map Short

2020-04-03 Thread GitBox
bowenli86 commented on a change in pull request #11538: [FLINK-16813][jdbc]  
JDBCInputFormat doesn't correctly map Short
URL: https://github.com/apache/flink/pull/11538#discussion_r403184113
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/StringUtils.java
 ##
 @@ -139,8 +139,16 @@ public static String arrayToString(Object array) {
if (array instanceof long[]) {
return Arrays.toString((long[]) array);
}
-   if (array instanceof Object[]) {
-   return Arrays.toString((Object[]) array);
+   // for array of byte array
+   if (array instanceof byte[][]) {
 
 Review comment:
   maybe it should be added later, I haven't seen a case yet. 
   
   We need byte[][] as pg supports array of byte array. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make 
FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#issuecomment-599949839
 
 
   
   ## CI report:
   
   * e5e11418358bf450d1cca543916bbd7d695375b1 UNKNOWN
   * ad481f5d846621032feb21e409690eed5b114191 UNKNOWN
   * 2a76c7689c032056a462a615c28f27f1361f1f0e Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158236401) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7045)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16978) Update flink-statefun README.md for Stateful Functions 2.0.

2020-04-03 Thread Marta Paes Moreira (Jira)
Marta Paes Moreira created FLINK-16978:
--

 Summary: Update flink-statefun README.md for Stateful Functions 
2.0.
 Key: FLINK-16978
 URL: https://issues.apache.org/jira/browse/FLINK-16978
 Project: Flink
  Issue Type: Task
  Components: Stateful Functions
Reporter: Marta Paes Moreira
Assignee: Marta Paes Moreira


Updating the README in the repository to fit the changes of Stateful Functions 
2.0.

Will also update the README in the statefun-python-sdk directory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-9173) RestClient - Received response is abnormal

2020-04-03 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074738#comment-17074738
 ] 

François Lacombe edited comment on FLINK-9173 at 4/3/20, 5:06 PM:
--

This issue is still alive with Flink 1.10 and Springboot 2.2.5.

I had to deal with it recently and colleagues of mine find out this workdaround:

 

Root cause comes from wrong classloader selection due to some SpringBoot 
packaging flaw.

In my pom.xml, I replaced :
{code:java}

  org.springframework.boot
  spring-boot-maven-plugin
{code}
 

by:
{code:java}

  org.apache.maven.plugins
  maven-dependency-plugin
  

  copy-dependencies
  prepare-package
  
copy-dependencies
  
  
 ${project.build.directory}/libs
  

  
 

  org.apache.maven.plugins
  maven-jar-plugin
  

  
true
libs/
 org.baeldung.executable.ExecutableMavenJar 
  

  
{code}
{{}}

 

It will produce a jar file with libs in a side directry instead of a fat-jar.

 

Hope it will help and eventually be fixed in a next release

All the best

 


was (Author: flacombe):
This issue is still alive with Flink 1.10 and Springboot 2.2.5.

I had to deal with it recently and colleagues of mine find out this workdaround:

 

Root cause comes from wrong classloader selection due to some SpringBoot 
packaging flaw.

In my pom.xml, I replaced :
{code:java}

  org.springframework.boot
  spring-boot-maven-plugin
{code}
 

by:

{{}}
{code:java}

  org.apache.maven.plugins
  maven-dependency-plugin
  

  copy-dependencies
  prepare-package
  
copy-dependencies
  
  
 ${project.build.directory}/libs
  

  
 

  org.apache.maven.plugins
  maven-jar-plugin
  

  
true
libs/
 org.baeldung.executable.ExecutableMavenJar 
  

  
{code}
{{}}

 

It will produce a jar file with libs in a side directry instead of a fat-jar.

 

Hope it will help and eventually be fixed in a next release

All the best

 

> RestClient - Received response is abnormal
> --
>
> Key: FLINK-9173
> URL: https://issues.apache.org/jira/browse/FLINK-9173
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.5.0
> Environment: OS:    CentOS 6.8
> JAVA:    1.8.0_161-b12
> maven-plugin:     spring-boot-maven-plugin
> Spring-boot:      1.5.10.RELEASE
>Reporter: Bob Lau
>Priority: Major
> Attachments: image-2018-04-17-14-09-33-991.png
>
>
> The system prints the exception log as follows:
>  
> {code:java}
> //代码占位符
> 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
> o.a.flink.runtime.rest.RestClient - Received response was neither of the 
> expected type ([simple type, class 
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) 
> nor an error. 
> Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
>  Unrecognized field "status" (class 
> org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
> ignorable (one known property: "errors"])
> at [Source: N/A; line: -1, column: -1] (through reference chain: 
> org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
> at 
> 

[jira] [Commented] (FLINK-9173) RestClient - Received response is abnormal

2020-04-03 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074738#comment-17074738
 ] 

François Lacombe commented on FLINK-9173:
-

This issue is still alive with Flink 1.10 and Springboot 2.2.5.

I had to deal with it recently and colleagues of mine find out this workdaround:

 

Root cause comes from wrong classloader selection due to some SpringBoot 
packaging flaw.

In my pom.xml, I replaced :
{code:java}

  org.springframework.boot
  spring-boot-maven-plugin
{code}
 

by:

{{}}
{code:java}

  org.apache.maven.plugins
  maven-dependency-plugin
  

  copy-dependencies
  prepare-package
  
copy-dependencies
  
  
 ${project.build.directory}/libs
  

  
 

  org.apache.maven.plugins
  maven-jar-plugin
  

  
true
libs/
 org.baeldung.executable.ExecutableMavenJar 
  

  
{code}
{{}}

 

It will produce a jar file with libs in a side directry instead of a fat-jar.

 

Hope it will help and eventually be fixed in a next release

All the best

 

> RestClient - Received response is abnormal
> --
>
> Key: FLINK-9173
> URL: https://issues.apache.org/jira/browse/FLINK-9173
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.5.0
> Environment: OS:    CentOS 6.8
> JAVA:    1.8.0_161-b12
> maven-plugin:     spring-boot-maven-plugin
> Spring-boot:      1.5.10.RELEASE
>Reporter: Bob Lau
>Priority: Major
> Attachments: image-2018-04-17-14-09-33-991.png
>
>
> The system prints the exception log as follows:
>  
> {code:java}
> //代码占位符
> 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
> o.a.flink.runtime.rest.RestClient - Received response was neither of the 
> expected type ([simple type, class 
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) 
> nor an error. 
> Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
>  Unrecognized field "status" (class 
> org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
> ignorable (one known property: "errors"])
> at [Source: N/A; line: -1, column: -1] (through reference chain: 
> org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547)
> at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
> at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210)
> at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  In the development 

[GitHub] [flink-statefun] StephanEwen opened a new pull request #95: [FLINK-16977][docs] Add docs for components and distributed architecture

2020-04-03 Thread GitBox
StephanEwen opened a new pull request #95: [FLINK-16977][docs] Add docs for 
components and distributed architecture
URL: https://github.com/apache/flink-statefun/pull/95
 
 
   This adds documentation for the components of a distributed deployment.
   
   
![image](https://user-images.githubusercontent.com/1727146/78386265-8b173e80-75dd-11ea-9dfb-856f29279159.png)
   
   
![image](https://user-images.githubusercontent.com/1727146/78386302-a124ff00-75dd-11ea-9726-a6e292b26079.png)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16977) Add Architecture docs for Stateful Functions

2020-04-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16977:
---
Labels: pull-request-available  (was: )

> Add Architecture docs for Stateful Functions
> 
>
> Key: FLINK-16977
> URL: https://issues.apache.org/jira/browse/FLINK-16977
> Project: Flink
>  Issue Type: Improvement
>  Components: Stateful Functions
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 2.1.0
>
>
> We should add a section to the documentation describing the distributed setup 
> and architecture of a Stateful Functions applications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16977) Add Architecture docs for Stateful Functions

2020-04-03 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-16977:


 Summary: Add Architecture docs for Stateful Functions
 Key: FLINK-16977
 URL: https://issues.apache.org/jira/browse/FLINK-16977
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 2.1.0, 2.0.0


We should add a section to the documentation describing the distributed setup 
and architecture of a Stateful Functions applications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * f128770ed2a5816a1f460734f736fd07b273e896 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043)
 
   * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158256732) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7047)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403089669
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int requiredTaskManagers = 
getPendingWorkerNums().get(workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.getNum(workerResourceSpec);
 
 Review comment:
   I think I would hide `pendingWorkerCounter` behind some methods which the 
base class provides.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403128803
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -540,39 +571,41 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
/**
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
-   private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredTaskManagers();
-
-   if (requiredTaskManagers > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   private void requestYarnContainerIfRequired(Resource containerResource) 
{
+   getPendingWorkerNums().entrySet().stream()
+   .filter(entry ->
+   
getContainerResource(entry.getKey()).equals(containerResource) &&
+   entry.getValue() > 
pendingWorkerCounter.getNum(entry.getKey()))
+   .findAny()
 
 Review comment:
   Shouldn't we do this for all instead of any?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403115740
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainerStatus.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ * A {@link ContainerStatus} implementation for testing.
+ */
+class TestingContainerStatus extends ContainerStatus {
+
+   private final ContainerId containerId;
+   private final ContainerState containerState;
+   private final String diagnostics;
+   private final int exitStatus;
+
+   TestingContainerStatus(
+   final ContainerId containerId,
+   final ContainerState containerState,
+   final String diagnostics,
+   final int exitStatus) {
+
+   this.containerId = containerId;
+   this.containerState = containerState;
+   this.diagnostics = diagnostics;
+   this.exitStatus = exitStatus;
+   }
+
+   @Override
+   public ContainerId getContainerId() {
+   return containerId;
+   }
+
+   @Override
+   public void setContainerId(ContainerId containerId) {
+
 
 Review comment:
   Shouldn't we fail in case someone calls `setContainerId` here? Otherwise it 
might go unnoticed and result in some strange behaviour/failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403121941
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+
+import java.util.function.Consumer;
+
+/**
+ * A Yarn {@link NMClientAsync} implementation for testing.
+ */
+class TestingYarnNMClientAsync extends NMClientAsyncImpl {
+
+   private Consumer> startContainerAsyncConsumer = ignored -> {};
+   private Consumer> 
stopContainerAsyncConsumer = ignored -> {};
 
 Review comment:
   I'd suggest to use the `TriConsumer` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403132471
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -540,39 +571,41 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
/**
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
-   private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredTaskManagers();
-
-   if (requiredTaskManagers > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   private void requestYarnContainerIfRequired(Resource containerResource) 
{
 
 Review comment:
   Shouldn't we iterate over all resources which are needed instead of 
restricting it to `containerResource`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403133265
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactoryTest.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerFactory}.
+ */
+public class YarnResourceManagerFactoryTest {
 
 Review comment:
   ```suggestion
   public class YarnResourceManagerFactoryTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403090988
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int requiredTaskManagers = 
getPendingWorkerNums().get(workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.getNum(workerResourceSpec);
 
-   if (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   if (requiredTaskManagers > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
-   final KubernetesWorkerNode kubernetesWorkerNode = 
workerNodes.remove(new ResourceID(pod.getName()));
-   if (kubernetesWorkerNode != null) {
-   requestKubernetesPodIfRequired();
+   final WorkerResourceSpec workerResourceSpec = 
removeWorkerNodeAndResourceSpec(new ResourceID(pod.getName()));
+   if (workerResourceSpec != null) {
+   
requestKubernetesPodIfRequired(workerResourceSpec);
 

[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403118223
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
 
 Review comment:
   `AMRMClientAsyncImpl` is annotated as unstable. I'm not sure how will the 
testing implementation works across different Yarn versions. Have we tried this 
out?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403105830
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
 
 Review comment:
   If we are interested in every `WorkerResourceSpec` which ever resulted into 
a given `Resource`, then I would suggest to change the value type to `List`. 
Otherwise one could instantiate this field with a `Set` which has different 
semantics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403113398
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainer.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+
+/**
+ * A {@link Container} implementation for testing.
+ */
+class TestingContainer extends Container {
 
 Review comment:
   I like the idea of getting rid of Mockito but how do we ensure that this 
works with all Hadoop versions? Looking at Hadoop 2.10. 
https://github.com/apache/hadoop/blob/release-2.10.0-RC1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
 it looks as if the container has gotten some more methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403088133
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   Should we add a check state to ensure that we fail in case that we request a 
different size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403078672
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerTest.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest {
 
 Review comment:
   ```suggestion
   public class ActiveResourceManagerTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403126015
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -363,31 +356,64 @@ public void onContainersCompleted(final 
List statuses) {
@Override
public void onContainersAllocated(List containers) {
runAsync(() -> {
-   log.info("Received {} containers with {} pending 
container requests.", containers.size(), numPendingContainerRequests);
-   final Collection 
pendingRequests = getPendingRequests();
-   final Iterator 
pendingRequestsIterator = pendingRequests.iterator();
+   log.info("Received {} containers.", containers.size());
 
-   // number of allocated containers can be larger than 
the number of pending container requests
-   final int numAcceptedContainers = 
Math.min(containers.size(), numPendingContainerRequests);
-   final List requiredContainers = 
containers.subList(0, numAcceptedContainers);
-   final List excessContainers = 
containers.subList(numAcceptedContainers, containers.size());
-
-   for (int i = 0; i < requiredContainers.size(); i++) {
-   
removeContainerRequest(pendingRequestsIterator.next());
-   }
-
-   excessContainers.forEach(this::returnExcessContainer);
-   
requiredContainers.forEach(this::startTaskExecutorInContainer);
+   
groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated);
 
// if we are waiting for no further containers, we can 
go to the
// regular heartbeat interval
-   if (numPendingContainerRequests <= 0) {
+   if (pendingWorkerCounter.getTotalNum() <= 0) {

resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
});
}
 
-   private void startTaskExecutorInContainer(Container container) {
+   private Map> 
groupContainerByResource(List containers) {
+   return 
containers.stream().collect(Collectors.groupingBy(Container::getResource));
+   }
+
+   private void onContainersOfResourceAllocated(Resource resource, 
List containers) {
+   final List pendingWorkerResourceSpecs =
+   
workerSpecContainerResourceAdapter.getWorkerSpecs(resource).stream()
 
 Review comment:
   Can it happen that `getWorkerSpecs(resource)` returns a list which contains 
a `WorkerResourceSpec` twice?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403099591
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final boolean matchVcores) {
+   this.flinkConfig = 
Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.matchVcores = matchVcores;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Resource getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   return workerSpecToContainerResource.computeIfAbsent(
+   Preconditions.checkNotNull(workerResourceSpec),
+   this::createAndMapContainerResource);
+   }
+
+   @VisibleForTesting
+   Collection getWorkerSpecs(final Resource 
containerResource) {
+   return 
getEquivalentContainerResource(containerResource).stream()
+   .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptyList()).stream())
+   .collect(Collectors.toList());
+   }
+
+   @VisibleForTesting
+   Collection getEquivalentContainerResource(final 
Resource containerResource) {
+   // Yarn might ignore the requested vcores, depending on 
its configurations.
+   // In such cases, we should also not matching vcores.
+   return matchVcores ?
+   Collections.singletonList(containerResource) :
+   
containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), 
Collections.emptyList());
+   }
+
+   private Resource createAndMapContainerResource(final 
WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   I would suggest to add a check state to ensure that we fail once we enable 
dynamic worker resources.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403130352
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -540,39 +571,41 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
/**
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
-   private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredTaskManagers();
-
-   if (requiredTaskManagers > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   private void requestYarnContainerIfRequired(Resource containerResource) 
{
+   getPendingWorkerNums().entrySet().stream()
 
 Review comment:
   I have to admit that I find `getPendingWorkerNums()` and 
`pendingWorkerCounter` quite confusing. The sound almost the same but the 
former means the requirements of the `SlotManager` and the latter the currently 
pending workers which have been requested by the RM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403104873
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final boolean matchVcores) {
+   this.flinkConfig = 
Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.matchVcores = matchVcores;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Resource getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   return workerSpecToContainerResource.computeIfAbsent(
+   Preconditions.checkNotNull(workerResourceSpec),
+   this::createAndMapContainerResource);
+   }
+
+   @VisibleForTesting
+   Collection getWorkerSpecs(final Resource 
containerResource) {
+   return 
getEquivalentContainerResource(containerResource).stream()
+   .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptyList()).stream())
+   .collect(Collectors.toList());
+   }
+
+   @VisibleForTesting
+   Collection getEquivalentContainerResource(final 
Resource containerResource) {
 
 Review comment:
   If we are only interested in the equivalence class, then I would suggest to 
change the type from `Collection` to `Set`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403086625
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode 
worker) {
@Override
public void onAdded(List pods) {
runAsync(() -> {
-   for (KubernetesPod pod : pods) {
-   if (numPendingPodRequests > 0) {
-   numPendingPodRequests--;
+   pods.forEach(pod -> {
+   WorkerResourceSpec workerResourceSpec = 
podWorkerResources.get(pod.getName());
+   final int pendingNum = 
pendingWorkerCounter.getNum(workerResourceSpec);
+   if (pendingNum > 0) {
+   
pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   I'm wondering whether this logic shouldn't go into the 
`ActiveResourceManager`. I would expect that all `ActiveResourceManager` 
implementations would need to do something similar. Maybe we could introduce 
`notifyNewWorkerStarted(WorkerResourceSpec)`. This could also have the benefit 
that we could hide `pendingWorkerCounter` completely.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403097275
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
 
 Review comment:
   I think this class is large enough to warrant its own file. This would also 
decrease the size of this source code file a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403131968
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -412,30 +439,32 @@ private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId
 
final ResourceID resourceId = new 
ResourceID(containerId.toString());
// release the failed container
-   workerNodeMap.remove(resourceId);
+   YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
-   requestYarnContainerIfRequired();
+   
requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource());
}
 
private void returnExcessContainer(Container excessContainer) {
log.info("Returning excess container {}.", 
excessContainer.getId());

resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
 
-   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
-   numPendingContainerRequests--;
-
-   log.info("Removing container request {}. Pending container 
requests {}.", pendingContainerRequest, numPendingContainerRequests);
-
+   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
+   log.info("Removing container request {}.", 
pendingContainerRequest);
+   pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   Differently asked, when do we clean `workerSpecContainerResourceAdapter` up?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403092773
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -176,6 +185,15 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
SlotManager getSlotManager() {
return this.slotManager;
}
+
+   @Override
+   public Map getPendingWorkerNums() {
+   return customPendingWorkerNums != null ? 
customPendingWorkerNums : super.getPendingWorkerNums();
+   }
+
+   public void setCustomPendingWorkerNums(final 
Map customPendingWorkerNums) {
+   this.customPendingWorkerNums = customPendingWorkerNums;
+   }
 
 Review comment:
   I think this is pretty much whitebox testing as it strongly relies on 
internal implementation details. I would recommend to go another way and to 
rely either on the public APIs of the component or to encapsulate the 
bookkeeping logic so that it can be tested separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403123867
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -363,31 +356,64 @@ public void onContainersCompleted(final 
List statuses) {
@Override
public void onContainersAllocated(List containers) {
runAsync(() -> {
-   log.info("Received {} containers with {} pending 
container requests.", containers.size(), numPendingContainerRequests);
-   final Collection 
pendingRequests = getPendingRequests();
-   final Iterator 
pendingRequestsIterator = pendingRequests.iterator();
+   log.info("Received {} containers.", containers.size());
 
-   // number of allocated containers can be larger than 
the number of pending container requests
-   final int numAcceptedContainers = 
Math.min(containers.size(), numPendingContainerRequests);
-   final List requiredContainers = 
containers.subList(0, numAcceptedContainers);
-   final List excessContainers = 
containers.subList(numAcceptedContainers, containers.size());
-
-   for (int i = 0; i < requiredContainers.size(); i++) {
-   
removeContainerRequest(pendingRequestsIterator.next());
-   }
-
-   excessContainers.forEach(this::returnExcessContainer);
-   
requiredContainers.forEach(this::startTaskExecutorInContainer);
+   
groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated);
 
 Review comment:
   I'd suggest to use the for-each loop.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403088473
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   Btw: where do we change the `Configuration`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403127546
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -412,30 +439,32 @@ private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId
 
final ResourceID resourceId = new 
ResourceID(containerId.toString());
// release the failed container
-   workerNodeMap.remove(resourceId);
+   YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
-   requestYarnContainerIfRequired();
+   
requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource());
}
 
private void returnExcessContainer(Container excessContainer) {
log.info("Returning excess container {}.", 
excessContainer.getId());

resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
 
-   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
-   numPendingContainerRequests--;
-
-   log.info("Removing container request {}. Pending container 
requests {}.", pendingContainerRequest, numPendingContainerRequests);
-
+   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
+   log.info("Removing container request {}.", 
pendingContainerRequest);
+   pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   What about removing it from `workerSpecContainerResourceAdapter`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403107046
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java
 ##
 @@ -34,4 +37,24 @@
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The 
location of the log config file, e.g. the path to your log4j.properties for 
log4j.");
+
+   /**
+* **DO NO USE** Whether {@link YarnResourceManager} should match the 
vcores of allocated containers with those requested.
+*
+* By default, Yarn ignores vcores in the container requests, and 
always allocate 1 vcore for each container.
+* Iff 'yarn.scheduler.capacity.resource-calculator' is set to 
'DominantResourceCalculator' for Yarn, will it
+* allocate container vcores as requested. Unfortunately, this 
configuration option is dedicated for Yarn Scheduler,
+* and is only accessible to applications in Hadoop 2.6+.
+*
+* ATM, it should be fine to not match vcores, because with the 
current {@link SlotManagerImpl} all the TM
+* containers should have the same resources.
+*
+* If later we add another {@link SlotManager} implementation that 
may have TMs with different resources, we can
+* switch this option on only for the new SM, and the new SM can also 
be available on Hadoop 2.6+ only.
+*/
+   public static final ConfigOption MATCH_CONTAINER_VCORES =
+   
key("$internal.yarn.resourcemanager.enable-vcore-matching")
+   .booleanType()
+   .defaultValue(false)
+   .withDescription("**DO NOT USE** 
Whether YarnResourceManager should match the container vcores.");
 
 Review comment:
   Does this mean that one has to configure ones Flink cluster depending on the 
configuration of the Yarn cluster? What happens if one forgets about Flink?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403108543
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -584,6 +590,85 @@ public void testGetCpuExceedMaxInt() throws Exception {
}};
}
 
+   @Test
+   public void testWorkerSpecContainerResourceAdapter_MatchVcores() {
+   final int minMemMB = 100;
+   final int minVcore = 10;
+   final YarnResourceManager.WorkerSpecContainerResourceAdapter 
adapter =
+   new 
YarnResourceManager.WorkerSpecContainerResourceAdapter(
+   getConfigProcessSpecEqualsWorkerSpec(), 
minMemMB, minVcore, true);
 
 Review comment:
   It is usually easier to understand if one use an enum instead of boolean 
because one can give the different values expressive names (e.g. 
`MATCH_VCORES`, `IGNORE_VCORES`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403118602
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
+
+   private Function, 
List>>
+   getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
+   private Consumer> 
addContainerRequestConsumer = ignored -> {};
+   private Consumer> 
removeContainerRequestConsumer = ignored -> {};
+   private Consumer> 
releaseAssignedContainerConsumer = ignored -> {};
 
 Review comment:
   I'd suggest to use the `BiConsumer` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403132911
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactoryTest.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.entrypoint;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link KubernetesResourceManagerFactory}.
+ */
+public class KubernetesResourceManagerFactoryTest {
 
 Review comment:
   ```suggestion
   public class KubernetesResourceManagerFactoryTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403093997
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -321,6 +339,47 @@ public void testGetCpuCoresNumSlots() {
assertThat(resourceManager.getCpuCores(configuration), is(3.0));
}
 
+   @Test
+   public void testStartAndRecoverVariousResourceSpec() {
+   // Start two workers with different resources
+   final WorkerResourceSpec workerResourceSpec1 = new 
WorkerResourceSpec(1.0, 100, 0, 100, 100);
+   final WorkerResourceSpec workerResourceSpec2 = new 
WorkerResourceSpec(1.0, 99, 0, 100, 100);
+   resourceManager.startNewWorker(workerResourceSpec1);
+   resourceManager.startNewWorker(workerResourceSpec2);
+
+   // Verify two pods with both worker resources are started
+   final PodList initialPodList = kubeClient.pods().list();
+   assertEquals(2, initialPodList.getItems().size());
+   final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+   final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20));
+
+   // Notify resource manager about pods added.
+   final KubernetesPod initialKubernetesPod1 = new 
KubernetesPod(initialPod1);
+   final KubernetesPod initialKubernetesPod2 = new 
KubernetesPod(initialPod2);
+   resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, 
initialKubernetesPod2));
+
+   // Terminate pod1.
+   terminatePod(initialPod1);
+   
resourceManager.setCustomPendingWorkerNums(Collections.singletonMap(workerResourceSpec1,
 1));
 
 Review comment:
   I would recommend to not test the component like this. It requires detailed 
knowledge of the component's internals and makes it harder to evolve it because 
this test relies on the fact that the `KubernetesResourceManager` has a map of 
`WorkerResourceSpec` to `Integers`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403083789
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode 
worker) {
@Override
public void onAdded(List pods) {
runAsync(() -> {
-   for (KubernetesPod pod : pods) {
-   if (numPendingPodRequests > 0) {
-   numPendingPodRequests--;
+   pods.forEach(pod -> {
 
 Review comment:
   Call me old fashioned, but I think the for-each loop `for (KubernetesPod 
pod: pods)` is superior to `forEach`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403121671
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+
+import java.util.function.Consumer;
+
+/**
+ * A Yarn {@link NMClientAsync} implementation for testing.
+ */
+class TestingYarnNMClientAsync extends NMClientAsyncImpl {
 
 Review comment:
   Same here `NMClientAsyncImpl` seems to be unstable and might change 
depending on the used Yarn version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16670) Support User-Defined Metrics in Python UDF

2020-04-03 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-16670.
---
Resolution: Resolved

> Support User-Defined Metrics in  Python UDF
> ---
>
> Key: FLINK-16670
> URL: https://issues.apache.org/jira/browse/FLINK-16670
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.11.0
>
>
> This is the umbrella Jira for FLIP112, which intends to support User-Defined 
> Metrics in  Python UDF.
> FLIP wiki page: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-112%3A+Support+User-Defined+Metrics+in++Python+UDF]
> FLIP-58 adds the support for Python UDFs, but user-defined metrics have not 
> been supported yet. With metrics, users can report and monitor the UDF status 
> to get a deeper understanding of the execution. In this FLIP, we want to 
> support metrics for Python UDFs. In this FLIP we propose to:
>  * Support user-defined metrics including Counters, Gauges, Meters, 
> Distributions in Python UDFs. (Note: Histogram is not supported in this FLIP, 
> instead, Distributions is supported to report statistics about the 
> distribution of value. See more in the Distribution section.)
>  * Support defining user scopes.
>  * Support defining user variables.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * f128770ed2a5816a1f460734f736fd07b273e896 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043)
 
   * cb1fc7686309a1ad8a6278d060f51a24d74dbc00 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15836) Start a new pods watcher in KubernetesResourceManager when the old one is closed with exception

2020-04-03 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-15836:
--
Fix Version/s: (was: 1.10.1)

> Start a new pods watcher in KubernetesResourceManager when the old one is 
> closed with exception
> ---
>
> Key: FLINK-15836
> URL: https://issues.apache.org/jira/browse/FLINK-15836
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the discussion in the PR[1], if the {{watchReconnectLimit}} is configured 
> by users via java properties or environment, the watch may be stopped and all 
> the changes will not be processed properly. So we need to start a new pods 
> watcher in {{KubernetesResourceManager}} when the old one is closed with 
> exception.
> [1]. [https://github.com/apache/flink/pull/10965#discussion_r373491974]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11628: [FLINK-14504][rest] Add Cluster DataSet REST API

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11628: [FLINK-14504][rest] Add Cluster 
DataSet REST API
URL: https://github.com/apache/flink/pull/11628#issuecomment-608370153
 
 
   
   ## CI report:
   
   * 44066c510cd12ae93baf839652b1bb2b43cc8cab Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158181648) Azure: 
[CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7038)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * f128770ed2a5816a1f460734f736fd07b273e896 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16045) Extract connectors documentation to a top-level section

2020-04-03 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-16045.

Fix Version/s: 1.11.0
   Resolution: Fixed

master: c3251f5a0f7e0b2dc9240aa94221ca860b0f1988

> Extract connectors documentation to a top-level section
> ---
>
> Key: FLINK-16045
> URL: https://issues.apache.org/jira/browse/FLINK-16045
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16044) Extract libraries documentation to a top-level section

2020-04-03 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-16044.

Fix Version/s: 1.11.0
   Resolution: Fixed

master: e25f0d3c322982291005369da8a5a7cf62d4e59a

> Extract libraries documentation to a top-level section
> --
>
> Key: FLINK-16044
> URL: https://issues.apache.org/jira/browse/FLINK-16044
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] aljoscha closed pull request #11367: [FLINK-16044] / [FLINK-16045] Make libraries/connectors documentation top-level sections

2020-04-03 Thread GitBox
aljoscha closed pull request #11367: [FLINK-16044] / [FLINK-16045] Make 
libraries/connectors documentation top-level sections
URL: https://github.com/apache/flink/pull/11367
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16976) Update chinese documentation for ListCheckpointed deprecation

2020-04-03 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-16976:


 Summary: Update chinese documentation for ListCheckpointed 
deprecation
 Key: FLINK-16976
 URL: https://issues.apache.org/jira/browse/FLINK-16976
 Project: Flink
  Issue Type: Bug
  Components: chinese-translation, Documentation
Reporter: Aljoscha Krettek
 Fix For: 1.11.0


The change for the english documentation is in 
https://github.com/apache/flink/commit/10aadfc6906a1629f7e60eacf087e351ba40d517

The original Jira issue is FLINK-6258.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make 
FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#issuecomment-599949839
 
 
   
   ## CI report:
   
   * e5e11418358bf450d1cca543916bbd7d695375b1 UNKNOWN
   * ad481f5d846621032feb21e409690eed5b114191 UNKNOWN
   * 76235a9c945643047057064bda8ad49d3e90568c Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/158210509) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7041)
 
   * 2a76c7689c032056a462a615c28f27f1361f1f0e Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158236401) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7045)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11629: [FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for filesystem table sink.

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11629: 
[FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for 
filesystem table sink.
URL: https://github.com/apache/flink/pull/11629#issuecomment-608466138
 
 
   
   ## CI report:
   
   * 5a3f35f15271416d41248df519aa8d3ac1bbe5ea Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/158224123) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7042)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11627: [FLINK-16770][e2e] Make the checkpoint resuming e2e case pass by increasing the…

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11627: [FLINK-16770][e2e] Make the 
checkpoint resuming e2e case pass by increasing the…
URL: https://github.com/apache/flink/pull/11627#issuecomment-608340311
 
 
   
   ## CI report:
   
   * 59717320f953e6c4d3cfefe64c830fafee4d2c40 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158169993) Azure: 
[CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7037)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11427: [FLINK-15790][k8s] Make 
FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#issuecomment-599949839
 
 
   
   ## CI report:
   
   * e5e11418358bf450d1cca543916bbd7d695375b1 UNKNOWN
   * ad481f5d846621032feb21e409690eed5b114191 UNKNOWN
   * 76235a9c945643047057064bda8ad49d3e90568c Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158210509) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7041)
 
   * 2a76c7689c032056a462a615c28f27f1361f1f0e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on issue #11571: [FLINK-16710][runtime] Log Upload blocks Main Thread in TaskExecutor

2020-04-03 Thread GitBox
GJL commented on issue #11571: [FLINK-16710][runtime] Log Upload blocks Main 
Thread in TaskExecutor
URL: https://github.com/apache/flink/pull/11571#issuecomment-608502360
 
 
   Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-03 Thread GitBox
wangyang0918 commented on issue #11427: [FLINK-15790][k8s] Make FlinkKubeClient 
and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#issuecomment-608499803
 
 
   @azagrebin Thanks for your review. I have addressed most of your comments. 
Since some others are not applicable when we only set `createTaskManagerPod` 
and `stopPod` as asynchronous.
   Would you please take a look again at your convenience?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-6258) Deprecate ListCheckpointed interface

2020-04-03 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-6258.
---
Fix Version/s: 1.11.0
 Release Note: The `ListCheckpointed` interface has been deprecated because 
it uses Java Serialization for checkpointing state which is problematic for 
savepoint compatibility. Use the `CheckpointedFunction` interface instead, 
which gives more control over state serialization.
   Resolution: Fixed

master: 10aadfc6906a1629f7e60eacf087e351ba40d517

> Deprecate ListCheckpointed interface
> 
>
> Key: FLINK-6258
> URL: https://issues.apache.org/jira/browse/FLINK-6258
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Per discussion in https://github.com/apache/flink/pull/3508, we consider 
> deprecating the `ListCheckpointed` interface to discourage Java serialization 
> shortcuts for state registrations (towards this, the Java serialization 
> shortcuts provided by the `OperatorStateStore` interface have already been 
> deprecated in https://github.com/apache/flink/pull/3508).
> We should also remember to update 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state
>  if we decide to do this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] aljoscha closed pull request #11618: [FLINK-6258] Deprecate ListCheckpointed interface

2020-04-03 Thread GitBox
aljoscha closed pull request #11618: [FLINK-6258] Deprecate ListCheckpointed 
interface
URL: https://github.com/apache/flink/pull/11618
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11629: [FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for filesystem table sink.

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11629: 
[FLINK-14267][connectors/filesystem]Introduce BaseRow Encoder in csv for 
filesystem table sink.
URL: https://github.com/apache/flink/pull/11629#issuecomment-608466138
 
 
   
   ## CI report:
   
   * 5a3f35f15271416d41248df519aa8d3ac1bbe5ea Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158224123) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7042)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce embedded executor and use it for Web Submission

2020-04-03 Thread GitBox
flinkbot edited a comment on issue #11460: [FLINK-16655][FLINK-16657] Introduce 
embedded executor and use it for Web Submission
URL: https://github.com/apache/flink/pull/11460#issuecomment-601593052
 
 
   
   ## CI report:
   
   * 349f5d7bfd68016ba3595a17ff3a1533969581fb UNKNOWN
   * 1ef9863ccbeddc51317ec90fa662fd10a797b908 UNKNOWN
   * 48c7c6996d88457bc4ab2cf0f9a8060c76499cfd Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/158007513) Azure: 
[CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7005)
 
   * f128770ed2a5816a1f460734f736fd07b273e896 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/158227253) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7043)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on issue #11320: [FLINK-16437][runtime] Make SlotManager 
allocate resource from ResourceManager at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#issuecomment-608491962
 
 
   I think the last part of my comment was answered by #11353.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403037228
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -153,6 +155,8 @@ public SlotManagerImpl(
this.waitResultConsumedBeforeRelease = 
waitResultConsumedBeforeRelease;
this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
this.numSlotsPerWorker = numSlotsPerWorker;
+   this.defaultSlotResourceProfile = defaultWorkerResourceSpec != 
null ?
+   
generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, 
numSlotsPerWorker) : null;
 
 Review comment:
   Instead of setting `defaultSlotResourceProfile` to `null`, can't we set it 
to `ResourceProfile.UNKNOWN` or something like this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403037763
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -807,23 +811,32 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
return false;
}
 
-   private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   private Optional 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+   if (defaultWorkerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
+   return Optional.empty();
+   }
 
 Review comment:
   I think here we are leaking details from the enclosing component into the 
`SlotManager`. I think the `SlotManager` should not decide for the 
`ResourceManager` whether it can allocate new resources or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403060510
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1051,13 +1050,13 @@ protected abstract void internalDeregisterApplication(
@Nullable String optionalDiagnostics) throws 
ResourceManagerException;
 
/**
-* Allocates a resource using the resource profile.
+* Allocates a resource using the worker resource specification.
 *
-* @param resourceProfile The resource description
+* @param workerResourceSpec that describes the to be allocated worker.
 
 Review comment:
   ```suggestion
 * @param workerResourceSpec that describes the to be allocated worker
   ```
   
   param, return or throws tags are usually not terminated by a period.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403048662
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
 ##
 @@ -52,6 +53,9 @@
  */
 public class SlotManagerFailUnfulfillableTest extends TestLogger {
 
+   private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new 
WorkerResourceSpec(
+   100.0, 1, 1, 1, 1);
 
 Review comment:
   Looking at this line, I would recommend introducing a builder for the 
`WorkerResourceSpec` and to make the default constructor private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403053905
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) {
//  Resource Management
// 

 
-   protected int getNumberRequiredTaskManagerSlots() {
-   return slotManager.getNumberPendingTaskManagerSlots();
+   protected int getNumberRequiredTaskManagers() {
+   return getPendingWorkerNums().values().stream().reduce(0, 
Integer::sum);
 
 Review comment:
   For the time being I would add a check state which ensures that the 
`WorkerResourceSpecs` can all be fulfilled by the container size we are using 
to start new containers/pods. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403032569
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -133,14 +133,17 @@
@Nullable
private final WorkerResourceSpec defaultWorkerResourceSpec;
 
+   private final int numSlotsPerWorker;
+
public SlotManagerImpl(
SlotMatchingStrategy slotMatchingStrategy,
ScheduledExecutor scheduledExecutor,
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout,
boolean waitResultConsumedBeforeRelease,
-   @Nullable WorkerResourceSpec defaultWorkerResourceSpec) 
{
+   @Nullable WorkerResourceSpec defaultWorkerResourceSpec,
+   int numSlotsPerWorker) {
 
 Review comment:
   Maybe it would make sense to pass in the `SlotManagerConfiguration`. If this 
is possible, then one would not have to add another field to the constructor 
every time we add a new configuration parameter for the `SlotManager`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403054549
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -207,6 +208,18 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
}
 
@Override
+   public Map getPendingWorkerNums() {
+   final int pendingWorkerNum = (int) Math.ceil((double) 
pendingSlots.size() / numSlotsPerWorker);
+   if (pendingWorkerNum > 0) {
+   return Collections.singletonMap(
+   
Preconditions.checkNotNull(defaultWorkerResourceSpec,
+   "There should never be pending 
slots/workers in standalone mode."),
 
 Review comment:
   I think this should not be necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403054142
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -51,7 +53,7 @@
 
int getNumberFreeSlotsOf(InstanceID instanceId);
 
-   int getNumberPendingTaskManagerSlots();
+   Map getPendingWorkerNums();
 
 Review comment:
   `JavaDoc` missing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403056960
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -156,8 +156,8 @@ protected void 
internalDeregisterApplication(ApplicationStatus finalStatus, @Nul
}
 
@Override
-   public boolean startNewWorker(ResourceProfile resourceProfile) {
-   LOG.info("Starting new worker with resource profile, {}", 
resourceProfile);
+   public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
+   LOG.info("Starting new worker with worker resource spec, {}", 
workerResourceSpec);
requestKubernetesPod();
 
 Review comment:
   I guess that we need a check that the configured pod fulfills 
`workerResourceSpec`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >