[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148716173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -874,6 +894,13 @@ public void handleError(final Exception exception) { */ public abstract boolean stopWorker(ResourceID resourceID); + /** +* Cancel the allocation of a resource. If the resource allocation has not fulfilled, should cancel it. +* +* @param resourceProfile The resource description of the previous allocation +*/ + public abstract void cancelNewWorker(ResourceProfile resourceProfile); --- End diff -- I comment it is slot manager. ---
[jira] [Commented] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.
[ https://issues.apache.org/jira/browse/FLINK-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237169#comment-16237169 ] ASF GitHub Bot commented on FLINK-7870: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148716173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -874,6 +894,13 @@ public void handleError(final Exception exception) { */ public abstract boolean stopWorker(ResourceID resourceID); + /** +* Cancel the allocation of a resource. If the resource allocation has not fulfilled, should cancel it. +* +* @param resourceProfile The resource description of the previous allocation +*/ + public abstract void cancelNewWorker(ResourceProfile resourceProfile); --- End diff -- I comment it is slot manager. > SlotPool should cancel the slot request to RM if not need any more. > --- > > Key: FLINK-7870 > URL: https://issues.apache.org/jira/browse/FLINK-7870 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > 1. SlotPool will request slot to rm if its slots are not enough. > 2. If a slot request is not fulfilled in a certain time, SlotPool will treat > the request as timeout and send a new slot request by triggering a failover > in JobMaster, the previous request is not needed any more, but rm does not > know it. > 3. This may cause the rm request much more resource than the job really need. > For example: > 1. A job need 100 slots. RM request 100 container to YARN. > 2. But YARN is busy now, it has no resource for the job. > 3. The job failover as the resource request not fulfilled in time. > 4. It ask 100 slots again, now RM request 200 container to YARN. > 5. If failover server time, the containers request will become more and more. > 6. Now YARN has resource, it will find that the job may need thousands of > containers. This is a waste of resources. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.
[ https://issues.apache.org/jira/browse/FLINK-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237162#comment-16237162 ] ASF GitHub Bot commented on FLINK-7870: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148715689 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -52,4 +52,6 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + void cancelResourceAllocation(ResourceProfile resourceProfile); --- End diff -- I comment it is slot manager. > SlotPool should cancel the slot request to RM if not need any more. > --- > > Key: FLINK-7870 > URL: https://issues.apache.org/jira/browse/FLINK-7870 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > 1. SlotPool will request slot to rm if its slots are not enough. > 2. If a slot request is not fulfilled in a certain time, SlotPool will treat > the request as timeout and send a new slot request by triggering a failover > in JobMaster, the previous request is not needed any more, but rm does not > know it. > 3. This may cause the rm request much more resource than the job really need. > For example: > 1. A job need 100 slots. RM request 100 container to YARN. > 2. But YARN is busy now, it has no resource for the job. > 3. The job failover as the resource request not fulfilled in time. > 4. It ask 100 slots again, now RM request 200 container to YARN. > 5. If failover server time, the containers request will become more and more. > 6. Now YARN has resource, it will find that the job may need thousands of > containers. This is a waste of resources. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148715689 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -52,4 +52,6 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + void cancelResourceAllocation(ResourceProfile resourceProfile); --- End diff -- I comment it is slot manager. ---
[jira] [Commented] (FLINK-7870) SlotPool should cancel the slot request to RM if not need any more.
[ https://issues.apache.org/jira/browse/FLINK-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237161#comment-16237161 ] ASF GitHub Bot commented on FLINK-7870: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148715651 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID allocationId) { PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); if (null != pendingSlotRequest) { - cancelPendingSlotRequest(pendingSlotRequest); + if (pendingSlotRequest.isAssigned()) { + cancelPendingSlotRequest(pendingSlotRequest); + } + else { + resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile()); --- End diff -- Yes, the SlotManager can decide to release the resource more than needed. But in a worst case: 1. Now the MESOS or YARN cluster have not enough resource. 2. A job ask for 100 worker of size A; 3. As there are not enough resource, the job failover, the previous 100 is not cancelled, it ask another 100. 4. This repeated several times, the pending requests for worker of size A reaches 1. 5. A worker of size B crashed, so the job now only need 100 woker of size A and 1 worker of size B. But the YARN or MESOS think the job need 1 A and 1 B as the request are never cancelled. 6. The MESOS/YARN now have some resources for 110 A, more than 100 A and 1 B, and it begin to assign resource for the job, but it first try to allocate 1 containers of size A, and the job still can not be started as it is lack of container B. 7. This may cause the job can not be started even when the cluster resource is now enough in a long time. 8. And this did happen in our cluster, as our cluster is busy. So I think it's better to keep this protocol, and different resource managers can treat this protocol according to their need. > SlotPool should cancel the slot request to RM if not need any more. > --- > > Key: FLINK-7870 > URL: https://issues.apache.org/jira/browse/FLINK-7870 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > 1. SlotPool will request slot to rm if its slots are not enough. > 2. If a slot request is not fulfilled in a certain time, SlotPool will treat > the request as timeout and send a new slot request by triggering a failover > in JobMaster, the previous request is not needed any more, but rm does not > know it. > 3. This may cause the rm request much more resource than the job really need. > For example: > 1. A job need 100 slots. RM request 100 container to YARN. > 2. But YARN is busy now, it has no resource for the job. > 3. The job failover as the resource request not fulfilled in time. > 4. It ask 100 slots again, now RM request 200 container to YARN. > 5. If failover server time, the containers request will become more and more. > 6. Now YARN has resource, it will find that the job may need thousands of > containers. This is a waste of resources. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148715651 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID allocationId) { PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); if (null != pendingSlotRequest) { - cancelPendingSlotRequest(pendingSlotRequest); + if (pendingSlotRequest.isAssigned()) { + cancelPendingSlotRequest(pendingSlotRequest); + } + else { + resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile()); --- End diff -- Yes, the SlotManager can decide to release the resource more than needed. But in a worst case: 1. Now the MESOS or YARN cluster have not enough resource. 2. A job ask for 100 worker of size A; 3. As there are not enough resource, the job failover, the previous 100 is not cancelled, it ask another 100. 4. This repeated several times, the pending requests for worker of size A reaches 1. 5. A worker of size B crashed, so the job now only need 100 woker of size A and 1 worker of size B. But the YARN or MESOS think the job need 1 A and 1 B as the request are never cancelled. 6. The MESOS/YARN now have some resources for 110 A, more than 100 A and 1 B, and it begin to assign resource for the job, but it first try to allocate 1 containers of size A, and the job still can not be started as it is lack of container B. 7. This may cause the job can not be started even when the cluster resource is now enough in a long time. 8. And this did happen in our cluster, as our cluster is busy. So I think it's better to keep this protocol, and different resource managers can treat this protocol according to their need. ---
[jira] [Updated] (FLINK-7971) Fix potential NPE with inconsistent state
[ https://issues.apache.org/jira/browse/FLINK-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-7971: -- Description: In {{GroupAggProcessFunction}}, the status of {{state}} and {{cntState}} are not consistent, which may cause NPE when {{state}} is not null but {{cntState}} is null. was: In {{GroupAggProcessFunction}}, the status of {{state}} and {{cntState}} is not consistent, which may cause NPE when {{state}} is not null but {{cntState}} is null. > Fix potential NPE with inconsistent state > - > > Key: FLINK-7971 > URL: https://issues.apache.org/jira/browse/FLINK-7971 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > > In {{GroupAggProcessFunction}}, the status of {{state}} and {{cntState}} are > not consistent, which may cause NPE when {{state}} is not null but > {{cntState}} is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7971) Fix potential NPE with inconsistent state
Ruidong Li created FLINK-7971: - Summary: Fix potential NPE with inconsistent state Key: FLINK-7971 URL: https://issues.apache.org/jira/browse/FLINK-7971 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Ruidong Li Assignee: Ruidong Li Priority: Major In {{GroupAggProcessFunction}}, the status of {{state}} and {{cntState}} is not consistent, which may cause NPE when {{state}} is not null but {{cntState}} is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots
[ https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7970: Description: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. !https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg! was: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. !sp.jpg|thumbnail! > SlotPool support batch allocating slots > --- > > Key: FLINK-7970 > URL: https://issues.apache.org/jira/browse/FLINK-7970 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: sp.jpg > > > Now all slot allocation is one by one from execution to slot pool. If batch > allocate a number of slots to slot pool, then slot pool can assign its cached > slots according to the global slot requests of all executions, so it can make > an optimal matching between slot and execution, this is especially usefully > for failover. For example, it can assign a slot to the execution whose state > is just on the machine when failover. > !https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots
[ https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7970: Attachment: sp.jpg > SlotPool support batch allocating slots > --- > > Key: FLINK-7970 > URL: https://issues.apache.org/jira/browse/FLINK-7970 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: sp.jpg > > > Now all slot allocation is one by one from execution to slot pool. If batch > allocate a number of slots to slot pool, then slot pool can assign its cached > slots according to the global slot requests of all executions, so it can make > an optimal matching between slot and execution, this is especially usefully > for failover. For example, it can assign a slot to the execution whose state > is just on the machine when failover. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots
[ https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7970: Description: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. !sp.jpg|thumbnail! was: Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. > SlotPool support batch allocating slots > --- > > Key: FLINK-7970 > URL: https://issues.apache.org/jira/browse/FLINK-7970 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: sp.jpg > > > Now all slot allocation is one by one from execution to slot pool. If batch > allocate a number of slots to slot pool, then slot pool can assign its cached > slots according to the global slot requests of all executions, so it can make > an optimal matching between slot and execution, this is especially usefully > for failover. For example, it can assign a slot to the execution whose state > is just on the machine when failover. > !sp.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7970) SlotPool support batch allocating slots
shuai.xu created FLINK-7970: --- Summary: SlotPool support batch allocating slots Key: FLINK-7970 URL: https://issues.apache.org/jira/browse/FLINK-7970 Project: Flink Issue Type: Improvement Components: JobManager Reporter: shuai.xu Assignee: shuai.xu Now all slot allocation is one by one from execution to slot pool. If batch allocate a number of slots to slot pool, then slot pool can assign its cached slots according to the global slot requests of all executions, so it can make an optimal matching between slot and execution, this is especially usefully for failover. For example, it can assign a slot to the execution whose state is just on the machine when failover. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237111#comment-16237111 ] Sihua Zhou commented on FLINK-7866: --- [~till.rohrmann] thanks and so happy for thinking of me when you plan to revision scheduler, I do have some thought about a properly scheduler, works incrementally is a nice requirement as you have explained. IMO, it also need the follow features. 1, Strong Extendibility. This means that scheduler should be easy to extends for other `schedule-factor` not only just for state & inputs, E.g: TM's status and cluster's status. 2, Consider the Runtime Information of the job. This means that when do scheduling we need to consider the previous runtime information of the execution, the runtime information should contains but only `task manager location`, `state size`, `input flow rate`, `thoughtput`, I think these will be helpful for scheduler. For example, imagine that vertex `A`,`B` both connect to vertex 'C' with `forward` and if A's `thoughtput` was `1M \ s` and B's `thoughput` was `100M\s`, than B's slot will be picked for 'C'. Currently, runtime information can only be filled when we do recover, is there a chance that the scheduler can Self-Regulating, dynamic change the schdule result. Ah, what I want to express is runtime information of execution is helpful. I am looking forward to your plan and interest in it :) > Weigh list of preferred locations for scheduling > > > Key: FLINK-7866 > URL: https://issues.apache.org/jira/browse/FLINK-7866 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > [~sihuazhou] proposed to not only use the list of preferred locations to > decide where to schedule a task, but to also weigh the list according to how > often a location appeared and then select the location based on the weight. > That way, we would obtain better locality in some cases. > Example: > Preferred locations list: {{[location1, location2, location2]}} > Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-7969: --- Assignee: shuai.xu > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Labels: flip-6 (was: ) > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu > Labels: flip-6 > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7969) Resource manager support batch request slots
shuai.xu created FLINK-7969: --- Summary: Resource manager support batch request slots Key: FLINK-7969 URL: https://issues.apache.org/jira/browse/FLINK-7969 Project: Flink Issue Type: Improvement Components: ResourceManager Reporter: shuai.xu Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! was: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now > resource manager only support requesting slot one by one, it's better to make > it support batch allocating alots, so that it can make a global decision with > all the resource requests. For example: it can decide how many slots should > be put into one task manager. > !rm.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! was: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now > resource manager only support requesting slot one by one, it's better to make > it support batch allocating alots, so that it can make a global decision with > all the resource requests. For example: it can decide how many slots should > be put into one task manager. > !rm.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! was: !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Attachment: (was: rm.png) > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.jpg|thumbnail! was: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.png|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Attachment: rm.jpg > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.jpg > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Attachment: rm.png > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.png > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !attachment-name.jpg|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7969) Resource manager support batch request slots
[ https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7969: Description: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !rm.png|thumbnail! was: Now resource manager only support requesting slot one by one, it's better to make it support batch allocating alots, so that it can make a global decision with all the resource requests. For example: it can decide how many slots should be put into one task manager. !attachment-name.jpg|thumbnail! > Resource manager support batch request slots > > > Key: FLINK-7969 > URL: https://issues.apache.org/jira/browse/FLINK-7969 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Attachments: rm.png > > > Now resource manager only support requesting slot one by one, it's better to > make it support batch allocating alots, so that it can make a global decision > with all the resource requests. For example: it can decide how many slots > should be put into one task manager. > !rm.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236998#comment-16236998 ] ASF GitHub Bot commented on FLINK-7941: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4930 @zentol Thank you for your remind, I'v updated jackson imports. thanks > Port SubtasksTimesHandler to new REST endpoint > -- > > Key: FLINK-7941 > URL: https://issues.apache.org/jira/browse/FLINK-7941 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Port *SubtasksTimesHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4930: [FLINK-7941][flip6] Port SubtasksTimesHandler to new REST...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4930 @zentol Thank you for your remind, I'v updated jackson imports. thanks ---
[jira] [Commented] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236996#comment-16236996 ] ASF GitHub Bot commented on FLINK-7941: --- GitHub user zjureel reopened a pull request: https://github.com/apache/flink/pull/4930 [FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint ## What is the purpose of the change Port SubtasksTimesHandler to new REST endpoint ## Brief change log - *Add new SubtasksTimesHandler class* - *Add SubtasksTimesInfo/SubtaskTimesHeaders* ## Verifying this change This change added tests and can be verified as follows: - *Add SubtasksTimesInfoTest for testing json response* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7941 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4930 commit 1dfc1131fd5325a0f2284237657d3027f19e4fcf Author: zjureel Date: 2017-11-03T02:27:27Z [FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint > Port SubtasksTimesHandler to new REST endpoint > -- > > Key: FLINK-7941 > URL: https://issues.apache.org/jira/browse/FLINK-7941 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Port *SubtasksTimesHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4930: [FLINK-7941][flip6] Port SubtasksTimesHandler to n...
GitHub user zjureel reopened a pull request: https://github.com/apache/flink/pull/4930 [FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint ## What is the purpose of the change Port SubtasksTimesHandler to new REST endpoint ## Brief change log - *Add new SubtasksTimesHandler class* - *Add SubtasksTimesInfo/SubtaskTimesHeaders* ## Verifying this change This change added tests and can be verified as follows: - *Add SubtasksTimesInfoTest for testing json response* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7941 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4930 commit 1dfc1131fd5325a0f2284237657d3027f19e4fcf Author: zjureel Date: 2017-11-03T02:27:27Z [FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint ---
[jira] [Commented] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236994#comment-16236994 ] ASF GitHub Bot commented on FLINK-7941: --- Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4930 > Port SubtasksTimesHandler to new REST endpoint > -- > > Key: FLINK-7941 > URL: https://issues.apache.org/jira/browse/FLINK-7941 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Port *SubtasksTimesHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4930: [FLINK-7941][flip6] Port SubtasksTimesHandler to n...
Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4930 ---
[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins
[ https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236751#comment-16236751 ] Elias Levy commented on FLINK-6243: --- [~StephanEwen] anything to review? > Continuous Joins: True Sliding Window Joins > > > Key: FLINK-6243 > URL: https://issues.apache.org/jira/browse/FLINK-6243 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy >Priority: Major > > Flink defines sliding window joins as the join of elements of two streams > that share a window of time, where the windows are defined by advancing them > forward some amount of time that is less than the window time span. More > generally, such windows are just overlapping hopping windows. > Other systems, such as Kafka Streams, support a different notion of sliding > window joins. In these systems, two elements of a stream are joined if the > absolute time difference between the them is less or equal the time window > length. > This alternate notion of sliding window joins has some advantages in some > applications over the current implementation. > Elements to be joined may both fall within multiple overlapping sliding > windows, leading them to be joined multiple times, when we only wish them to > be joined once. > The implementation need not instantiate window objects to keep track of > stream elements, which becomes problematic in the current implementation if > the window size is very large and the slide is very small. > It allows for asymmetric time joins. E.g. join if elements from stream A are > no more than X time behind and Y time head of an element from stream B. > It is currently possible to implement a join with these semantics using > {{CoProcessFunction}}, but the capability should be a first class feature, > such as it is in Kafka Streams. > To perform the join, elements of each stream must be buffered for at least > the window time length. To allow for large window sizes and high volume of > elements, the state, possibly optionally, should be buffered such as it can > spill to disk (e.g. by using RocksDB). > The same stream may be joined multiple times in a complex topology. As an > optimization, it may be wise to reuse any element buffer among colocated join > operators. Otherwise, there may write amplification and increased state that > must be snapshotted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7339) aggregationToString fails when user defined aggregation contains constants
[ https://issues.apache.org/jira/browse/FLINK-7339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-7339: Assignee: Fabian Hueske > aggregationToString fails when user defined aggregation contains constants > -- > > Key: FLINK-7339 > URL: https://issues.apache.org/jira/browse/FLINK-7339 > Project: Flink > Issue Type: Bug > Components: DataStream API, Table API & SQL >Affects Versions: 1.3.1 >Reporter: Stefano Bortoli >Assignee: Fabian Hueske >Priority: Major > > Issue related to FLINK-7338, when the user defined aggregation contains a > constant it breaks the aggregation translation to string, which are mapped 1 > to 1 to the input fields. > OverAggregates.scala aggregationToString function fails to find a parameter > of the function among the input fields, and therefore throws a > RuntimeException. > {code} > private[flink] def aggregationToString( > inputType: RelDataType, > rowType: RelDataType, > namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = { > val inFields = inputType.getFieldNames.asScala > val outFields = rowType.getFieldNames.asScala > val aggStrings = namedAggregates.map(_.getKey).map( > a => s"${a.getAggregation}(${ > if (a.getArgList.size() > 0) { > // ERROR HAPPENS HERE! > a.getArgList.asScala.map(inFields(_)).mkString(", ") > } else { > "*" > } > })") > (inFields ++ aggStrings).zip(outFields).map { > case (f, o) => if (f == o) { > f > } else { > s"$f AS $o" > } > }.mkString(", ") > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7338) User Defined aggregation with constants causes error under in lowerbound over window extraction
[ https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236687#comment-16236687 ] Fabian Hueske commented on FLINK-7338: -- Thank you [~stefano.bortoli] for reporting the bug and providing the fix! I'll fix this for the 1.4.0 release. Thanks, Fabian > User Defined aggregation with constants causes error under in lowerbound over > window extraction > --- > > Key: FLINK-7338 > URL: https://issues.apache.org/jira/browse/FLINK-7338 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Stefano Bortoli >Assignee: Fabian Hueske >Priority: Critical > > A user defined aggregation that passes a constant among the arguments causes > a RuntimeException extracting the lower boundary over window. > {code} > val sqlQuery = "SELECT a, " + > " myAgg(a, CAST('1' as BIGINT)) "+ > " OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND >PRECEDING AND CURRENT ROW) " + > "FROM MyTable" > {code} > The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala > we do : field count - lower bound index > -- which causes a -1 get, and subsequent RuntimeException. > We should do: lower bound offset - field count to find the value in the > constant array. > The code below should fix the problem. > {code} > private[flink] def getLowerBoundary( > logicWindow: Window, > overWindow: Group, > input: RelNode): Long = { > val ref: RexInputRef = > overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef] > val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount > val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2 > lowerBound match { > case x: java.math.BigDecimal => > x.asInstanceOf[java.math.BigDecimal].longValue() > case _ => lowerBound.asInstanceOf[Long] > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7338) User Defined aggregation with constants causes error under in lowerbound over window extraction
[ https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-7338: Assignee: Fabian Hueske > User Defined aggregation with constants causes error under in lowerbound over > window extraction > --- > > Key: FLINK-7338 > URL: https://issues.apache.org/jira/browse/FLINK-7338 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Stefano Bortoli >Assignee: Fabian Hueske >Priority: Critical > > A user defined aggregation that passes a constant among the arguments causes > a RuntimeException extracting the lower boundary over window. > {code} > val sqlQuery = "SELECT a, " + > " myAgg(a, CAST('1' as BIGINT)) "+ > " OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND >PRECEDING AND CURRENT ROW) " + > "FROM MyTable" > {code} > The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala > we do : field count - lower bound index > -- which causes a -1 get, and subsequent RuntimeException. > We should do: lower bound offset - field count to find the value in the > constant array. > The code below should fix the problem. > {code} > private[flink] def getLowerBoundary( > logicWindow: Window, > overWindow: Group, > input: RelNode): Long = { > val ref: RexInputRef = > overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef] > val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount > val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2 > lowerBound match { > case x: java.math.BigDecimal => > x.asInstanceOf[java.math.BigDecimal].longValue() > case _ => lowerBound.asInstanceOf[Long] > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state
[ https://issues.apache.org/jira/browse/FLINK-7968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236615#comment-16236615 ] ASF GitHub Bot commented on FLINK-7968: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4938 > Deduplicate serializer classes between runtime and queryable state > -- > > Key: FLINK-7968 > URL: https://issues.apache.org/jira/browse/FLINK-7968 > Project: Flink > Issue Type: Bug > Components: Queryable State >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > Some serializer classes where duplicated into {{flink-queryable-state}} to > avoid a dependency on {{flink-runtime}}. > The proper solution here is to move the classes to the shared {{flink-core}} > project, because these classes are actually useful in a series of API > utilities and they do not have any dependency on other flink classes at all. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4938: [FLINK-7968] [core] Move DataOutputSerializer and ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4938 ---
[GitHub] flink issue #4938: [FLINK-7968] [core] Move DataOutputSerializer and DataInp...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4938 +1 ---
[jira] [Commented] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state
[ https://issues.apache.org/jira/browse/FLINK-7968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236601#comment-16236601 ] ASF GitHub Bot commented on FLINK-7968: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4938 +1 > Deduplicate serializer classes between runtime and queryable state > -- > > Key: FLINK-7968 > URL: https://issues.apache.org/jira/browse/FLINK-7968 > Project: Flink > Issue Type: Bug > Components: Queryable State >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > Some serializer classes where duplicated into {{flink-queryable-state}} to > avoid a dependency on {{flink-runtime}}. > The proper solution here is to move the classes to the shared {{flink-core}} > project, because these classes are actually useful in a series of API > utilities and they do not have any dependency on other flink classes at all. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236587#comment-16236587 ] ASF GitHub Bot commented on FLINK-4228: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148656504 --- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml --- @@ -182,6 +182,21 @@ under the License. ${project.version} test + --- End diff -- Would be great if we can avoid adding these dependencies. This couples projects that were really meant to be independent, even if just in test scope. If this is about testing recursive upload, can this be written properly as a test case in this project? Or can the Yarn upload test be completely in the yarn test project, adding a dependency on this s3 project? > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236588#comment-16236588 ] ASF GitHub Bot commented on FLINK-4228: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148656918 --- Diff: flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java --- @@ -57,11 +62,52 @@ private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY"); private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY"); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass - public static void checkIfCredentialsArePresent() { + public static void checkCredentialsAndSetup() throws IOException { + // check whether credentials exist Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null); Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null); Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null); + + // initialize configuration with valid credentials --- End diff -- I would suggest to move this out of the "setup" method into the actual test. The setup logic is not shared (all other test methods don't assume that setup) and it also assumes existence of functionality that is tested in other test methods.. > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148656504 --- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml --- @@ -182,6 +182,21 @@ under the License. ${project.version} test + --- End diff -- Would be great if we can avoid adding these dependencies. This couples projects that were really meant to be independent, even if just in test scope. If this is about testing recursive upload, can this be written properly as a test case in this project? Or can the Yarn upload test be completely in the yarn test project, adding a dependency on this s3 project? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148656918 --- Diff: flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java --- @@ -57,11 +62,52 @@ private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY"); private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY"); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass - public static void checkIfCredentialsArePresent() { + public static void checkCredentialsAndSetup() throws IOException { + // check whether credentials exist Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null); Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null); Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null); + + // initialize configuration with valid credentials --- End diff -- I would suggest to move this out of the "setup" method into the actual test. The setup logic is not shared (all other test methods don't assume that setup) and it also assumes existence of functionality that is tested in other test methods.. ---
[jira] [Assigned] (FLINK-7421) AvroRow(De)serializationSchema not serializable
[ https://issues.apache.org/jira/browse/FLINK-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-7421: Assignee: Fabian Hueske (was: Timo Walther) > AvroRow(De)serializationSchema not serializable > --- > > Key: FLINK-7421 > URL: https://issues.apache.org/jira/browse/FLINK-7421 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors, Table API & SQL >Reporter: Timo Walther >Assignee: Fabian Hueske >Priority: Critical > > Both {{AvroRowDeserializationSchema}} and {{AvroRowSerializationSchema}} > contain fields that are not serializable. Those fields should be made > transient and both schemas need to be tested in practice. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6226) ScalarFunction and TableFunction do not support parameters of byte, short and float
[ https://issues.apache.org/jira/browse/FLINK-6226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-6226: Assignee: Fabian Hueske (was: Zhuoluo Yang) > ScalarFunction and TableFunction do not support parameters of byte, short and > float > --- > > Key: FLINK-6226 > URL: https://issues.apache.org/jira/browse/FLINK-6226 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Fabian Hueske >Priority: Major > > It seems to be a problem that ScalarFunction and TableFunction do not support > types of byte, short or float. > It will throw some exceptions like following; > {panel} > org.apache.flink.table.api.ValidationException: Given parameters of function > 'org$apache$flink$table$expressions$utils$Func18$$98a126fbdab73f43d640516da603291a' > do not match any signature. > Actual: (java.lang.String, java.lang.Integer, java.lang.Integer, > java.lang.Integer, java.lang.Long) > Expected: (java.lang.String, byte, short, int, long) > at > org.apache.flink.table.functions.utils.ScalarSqlFunction$$anon$1.inferReturnType(ScalarSqlFunction.scala:82) > at > org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:469) > at > org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:271) > at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:518) > at > org.apache.flink.table.expressions.ScalarFunctionCall.toRexNode(call.scala:68) > at > org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:76) > at > org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95) > at > org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) > at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.table.plan.logical.Project.construct(operators.scala:95) > at > org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77) > at org.apache.flink.table.api.Table.getRelNode(table.scala:72) > at > org.apache.flink.table.expressions.utils.ExpressionTestBase.addTableApiTestExpr(ExpressionTestBase.scala:215) > at > org.apache.flink.table.expressions.utils.ExpressionTestBase.testAllApis(ExpressionTestBase.scala:241) > at > org.apache.flink.table.expressions.UserDefinedScalarFunctionTest.testVariableArgs(UserDefinedScalarFunctionTest.scala:240) > {panel} > Testing code looks like following: > {code:java} > object Func18 extends ScalarFunction { > def eval(a: String, b: Byte, c: Short, d: Int, e: Long): String = { > a + "," + b + "," + c + "," + d + "," + e > } > } > class TableFunc4 extends TableFunction[Row] { > def eval(data: String, tinyInt: Byte, smallInt: Short, int: Int, long: > Long): Unit = { > val row = new Row(5) > row.setField(0, data) > row.setField(1, tinyInt) > row.setField(2, smallInt) > row.setField(3, int) > row.setField(4, long) > collect(row) > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.BYTE_TYPE_INFO, > BasicTypeInfo.SHORT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.LONG_TYPE_INFO > ) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4794: [build][minor] Add missing licenses
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4794 +1 ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236512#comment-16236512 ] ASF GitHub Bot commented on FLINK-4228: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4939 [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defaultFs ## What is the purpose of the change If YARN is configured to use the `s3a` default file system, upload of the Flink jars will fail since its `org.apache.hadoop.fs.FileSystem#copyFromLocalFile()` does not work recursively on the given `lib` folder. ## Brief change log - implement our own recursive upload (based on #2288) - add unit tests to verify its behaviour for both `hdfs://` and `s3://` (via S3A) resource uploads ## Verifying this change This change added tests and can be verified as follows: - added a unit test for HDFS uploads via our `MiniDFSCluster` - added integration test to verify S3 uploads (via the S3A filesystem implementation of the `flink-s3-fs-hadoop` sub-project) - manually verified the test on YARN with both S3A and HDFS default file systems being set ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes - internally) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4228 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4939 commit 5d31f41e0e480820e9fec1efa84e5725364a136d Author: Nico Kruber Date: 2017-11-02T18:38:48Z [hotfix][s3] fix HadoopS3FileSystemITCase leaving test directories behind in S3 commit bf47d376397a8e64625a031468d5f5d0a5486238 Author: Nico Kruber Date: 2016-11-09T20:04:50Z [FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs + includes a new unit tests for recursive uploads to hfds:// targets + add a unit test for recursive file uploads to s3:// via s3a > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4939 [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defaultFs ## What is the purpose of the change If YARN is configured to use the `s3a` default file system, upload of the Flink jars will fail since its `org.apache.hadoop.fs.FileSystem#copyFromLocalFile()` does not work recursively on the given `lib` folder. ## Brief change log - implement our own recursive upload (based on #2288) - add unit tests to verify its behaviour for both `hdfs://` and `s3://` (via S3A) resource uploads ## Verifying this change This change added tests and can be verified as follows: - added a unit test for HDFS uploads via our `MiniDFSCluster` - added integration test to verify S3 uploads (via the S3A filesystem implementation of the `flink-s3-fs-hadoop` sub-project) - manually verified the test on YARN with both S3A and HDFS default file systems being set ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes - internally) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4228 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4939 commit 5d31f41e0e480820e9fec1efa84e5725364a136d Author: Nico Kruber Date: 2017-11-02T18:38:48Z [hotfix][s3] fix HadoopS3FileSystemITCase leaving test directories behind in S3 commit bf47d376397a8e64625a031468d5f5d0a5486238 Author: Nico Kruber Date: 2016-11-09T20:04:50Z [FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs + includes a new unit tests for recursive uploads to hfds:// targets + add a unit test for recursive file uploads to s3:// via s3a ---
[jira] [Updated] (FLINK-7961) Docker-Flink with Docker Swarm doesn't work when machines are in different clouds
[ https://issues.apache.org/jira/browse/FLINK-7961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-7961: -- Description: Task Managers can't find Job Manager by name. Maybe some additional Docker configuration is needed? I am running the standard setup and create-docker-swarm-service.sh script from the Docker Flink project: https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/create-docker-swarm-service.sh This is the log from one of the Task Manager's containers: {noformat} Starting Task Manager config file: jobmanager.rpc.address: flink-jobmanager jobmanager.rpc.port: 6123 jobmanager.heap.mb: 1024 taskmanager.heap.mb: 1024 taskmanager.numberOfTaskSlots: 2 taskmanager.memory.preallocate: false parallelism.default: 1 jobmanager.web.port: 8081 blob.server.port: 6124 query.server.port: 6125 Starting taskmanager as a console application on host c42a6093f7bb. 2017-11-01 11:20:51,459 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2017-11-01 11:20:51,522 INFO org.apache.flink.runtime.taskmanager.TaskManager - 2017-11-01 11:20:51,522 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager (Version: 1.3.2, Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC) 2017-11-01 11:20:51,522 INFO org.apache.flink.runtime.taskmanager.TaskManager - Current user: flink 2017-11-01 11:20:51,522 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.141-b15 2017-11-01 11:20:51,522 INFO org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap size: 1024 MiBytes 2017-11-01 11:20:51,522 INFO org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: /docker-java-home/jre 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - Hadoop version: 2.7.2 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:+UseG1GC 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xms1024M 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xmx1024M 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:MaxDirectMemorySize=8388607T 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 2017-11-01 11:20:51,526 INFO org.apache.flink.runtime.taskmanager.TaskManager - Program Arguments: 2017-11-01 11:20:51,527 INFO org.apache.flink.runtime.taskmanager.TaskManager - --configDir 2017-11-01 11:20:51,527 INFO org.apache.flink.runtime.taskmanager.TaskManager - /opt/flink/conf 2017-11-01 11:20:51,527 INFO org.apache.flink.runtime.taskmanager.TaskManager - Classpath: /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar::: 2017-11-01 11:20:51,527 INFO org.apache.flink.runtime.taskmanager.TaskManager - 2017-11-01 11:20:51,528 INFO org.apache.flink.runtime.taskmanager.TaskManager - Registered UNIX signal handlers for [TERM, HUP, INT] 2017-11-01 11:20:51,532 INFO org.apache.flink.runtime.taskmanager.TaskManager - Maximum number of open file descriptors is 1048576 2017-11-01 11:20:51,548 INFO org.apache.flink.runtime.taskmanager.TaskManager - Loading configuration from /opt/flink/conf 2017-11-01 11:20:51,551 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, flink-jobmanager 2017-11-01 11:20:51,551 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 6123 2017-11-01 11:20:51,551 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.heap.mb, 1024 2017-11-01 11:20:51,551 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.mb, 1024 2017-
[jira] [Commented] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object
[ https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236479#comment-16236479 ] Greg Hogan commented on FLINK-7947: --- Do these changes necessitate modifying the {{@Public}} API? > Let ParameterTool return a dedicated GlobalJobParameters object > --- > > Key: FLINK-7947 > URL: https://issues.apache.org/jira/browse/FLINK-7947 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Bowen Li >Priority: Major > > The {{ParameterTool}} directly implements the {{GlobalJobParameters}} > interface. Additionally it has grown over time to not only store the > configuration parameters but also to record which parameters have been > requested and what default value was set. This information is irrelevant on > the server side when setting a {{GlobalJobParameters}} object via > {{ExecutionConfig#setGlobalJobParameters}}. > Since we don't separate the {{ParameterTool}} logic and the actual data view, > users ran into problems when reusing the same {{ParameterTool}} to start > multiple jobs concurrently (see FLINK-7943). I think it would be a much > clearer separation of concerns if we would actually split the > {{GlobalJobParameters}} from the {{ParameterTool}}. > Furthermore, we should think about whether {{ParameterTool#get}} should have > side effects or not as it does right now. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6226) ScalarFunction and TableFunction do not support parameters of byte, short and float
[ https://issues.apache.org/jira/browse/FLINK-6226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236297#comment-16236297 ] Fabian Hueske commented on FLINK-6226: -- I tried to reproduce the issue but the provided UDFs work on the current master branch. Seems like the issue was fixed in the mean time. I'd suggest to modify two existing test cases to use UDFs with {{byte}}, {{short}}, and {{float}} types. > ScalarFunction and TableFunction do not support parameters of byte, short and > float > --- > > Key: FLINK-6226 > URL: https://issues.apache.org/jira/browse/FLINK-6226 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang >Priority: Major > > It seems to be a problem that ScalarFunction and TableFunction do not support > types of byte, short or float. > It will throw some exceptions like following; > {panel} > org.apache.flink.table.api.ValidationException: Given parameters of function > 'org$apache$flink$table$expressions$utils$Func18$$98a126fbdab73f43d640516da603291a' > do not match any signature. > Actual: (java.lang.String, java.lang.Integer, java.lang.Integer, > java.lang.Integer, java.lang.Long) > Expected: (java.lang.String, byte, short, int, long) > at > org.apache.flink.table.functions.utils.ScalarSqlFunction$$anon$1.inferReturnType(ScalarSqlFunction.scala:82) > at > org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:469) > at > org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:271) > at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:518) > at > org.apache.flink.table.expressions.ScalarFunctionCall.toRexNode(call.scala:68) > at > org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:76) > at > org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95) > at > org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) > at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.table.plan.logical.Project.construct(operators.scala:95) > at > org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77) > at org.apache.flink.table.api.Table.getRelNode(table.scala:72) > at > org.apache.flink.table.expressions.utils.ExpressionTestBase.addTableApiTestExpr(ExpressionTestBase.scala:215) > at > org.apache.flink.table.expressions.utils.ExpressionTestBase.testAllApis(ExpressionTestBase.scala:241) > at > org.apache.flink.table.expressions.UserDefinedScalarFunctionTest.testVariableArgs(UserDefinedScalarFunctionTest.scala:240) > {panel} > Testing code looks like following: > {code:java} > object Func18 extends ScalarFunction { > def eval(a: String, b: Byte, c: Short, d: Int, e: Long): String = { > a + "," + b + "," + c + "," + d + "," + e > } > } > class TableFunc4 extends TableFunction[Row] { > def eval(data: String, tinyInt: Byte, smallInt: Short, int: Int, long: > Long): Unit = { > val row = new Row(5) > row.setField(0, data) > row.setField(1, tinyInt) > row.setField(2, smallInt) > row.setField(3, int) > row.setField(4, long) > collect(row) > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.BYTE_TYPE_INFO, > BasicTypeInfo.SHORT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.LONG_TYPE_INFO > ) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state
[ https://issues.apache.org/jira/browse/FLINK-7968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236262#comment-16236262 ] ASF GitHub Bot commented on FLINK-7968: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4938 [FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 'flink-core' ## What is the purpose of the change This moves the classes `DataInputDeserializer` and `DataOutputSerializer` to `flink-core`, because these classes are used across difference projects (`flink-runtime` and `flink-queryable-state`) and were previously duplicated. This is also needed for future cleanup in the proper scoping of `SerializationSchema`. It is easily possible, because these classes have no dependencies on other Flink classes anyways. ## Changelog - Move `DataInputDeserializer` and `DataOutputSerializer` to `flink-core` - Move associated tests and test utils to `flink-core` - Delete the duplicate serializers in `flink-queryable-state`. ## Verifying this change This change is a trivial rework / code cleanup which is covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink move_data_in_out Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4938.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4938 commit 6afd68b100edab4689a6e7a39b5aee7743e05853 Author: Stephan Ewen Date: 2017-11-02T16:19:19Z [hotfix] [tests] Improve TypeInfoTestCoverageTest commit 4d808a456d698253b2a77cc79e7d4386ceeab706 Author: Stephan Ewen Date: 2017-11-02T17:27:23Z [FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 'flink-core' These core flink utils are independent of any other runtime classes and are also used both in flink-runtime and in flink-queryable-state (which duplicated the code). > Deduplicate serializer classes between runtime and queryable state > -- > > Key: FLINK-7968 > URL: https://issues.apache.org/jira/browse/FLINK-7968 > Project: Flink > Issue Type: Bug > Components: Queryable State >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > Some serializer classes where duplicated into {{flink-queryable-state}} to > avoid a dependency on {{flink-runtime}}. > The proper solution here is to move the classes to the shared {{flink-core}} > project, because these classes are actually useful in a series of API > utilities and they do not have any dependency on other flink classes at all. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7421) AvroRow(De)serializationSchema not serializable
[ https://issues.apache.org/jira/browse/FLINK-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-7421: - Priority: Critical (was: Major) > AvroRow(De)serializationSchema not serializable > --- > > Key: FLINK-7421 > URL: https://issues.apache.org/jira/browse/FLINK-7421 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors, Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Both {{AvroRowDeserializationSchema}} and {{AvroRowSerializationSchema}} > contain fields that are not serializable. Those fields should be made > transient and both schemas need to be tested in practice. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4938: [FLINK-7968] [core] Move DataOutputSerializer and ...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4938 [FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 'flink-core' ## What is the purpose of the change This moves the classes `DataInputDeserializer` and `DataOutputSerializer` to `flink-core`, because these classes are used across difference projects (`flink-runtime` and `flink-queryable-state`) and were previously duplicated. This is also needed for future cleanup in the proper scoping of `SerializationSchema`. It is easily possible, because these classes have no dependencies on other Flink classes anyways. ## Changelog - Move `DataInputDeserializer` and `DataOutputSerializer` to `flink-core` - Move associated tests and test utils to `flink-core` - Delete the duplicate serializers in `flink-queryable-state`. ## Verifying this change This change is a trivial rework / code cleanup which is covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink move_data_in_out Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4938.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4938 commit 6afd68b100edab4689a6e7a39b5aee7743e05853 Author: Stephan Ewen Date: 2017-11-02T16:19:19Z [hotfix] [tests] Improve TypeInfoTestCoverageTest commit 4d808a456d698253b2a77cc79e7d4386ceeab706 Author: Stephan Ewen Date: 2017-11-02T17:27:23Z [FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 'flink-core' These core flink utils are independent of any other runtime classes and are also used both in flink-runtime and in flink-queryable-state (which duplicated the code). ---
[jira] [Created] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state
Stephan Ewen created FLINK-7968: --- Summary: Deduplicate serializer classes between runtime and queryable state Key: FLINK-7968 URL: https://issues.apache.org/jira/browse/FLINK-7968 Project: Flink Issue Type: Bug Components: Queryable State Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.4.0 Some serializer classes where duplicated into {{flink-queryable-state}} to avoid a dependency on {{flink-runtime}}. The proper solution here is to move the classes to the shared {{flink-core}} project, because these classes are actually useful in a series of API utilities and they do not have any dependency on other flink classes at all. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236251#comment-16236251 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148609899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -32,8 +46,11 @@ * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public class JobMetricsHandler extends AbstractMetricsHandler { +public class JobMetricsHandler extends AbstractMetricsHandler --- End diff -- Why aren't we simply implementing a new handler? I think that we don't reuse any of `AbstractMetricsHandler` functionality and, thus, there is no need to use it as a base class. > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236247#comment-16236247 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606039 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricMessageParameters.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for getting metrics. + */ +public class MetricMessageParameters extends MessageParameters { --- End diff -- Let's extend from `JobMessageParameters` > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236250#comment-16236250 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148610350 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java --- @@ -120,6 +121,16 @@ public synchronized ComponentMetricStore getJobMetricStore(String jobID) { } /** +* Returns the {@link ComponentMetricStore} for the given job ID. +* +* @param jobID job ID +* @return ComponentMetricStore for the given ID, or null if no store for the given argument exists +*/ + public synchronized ComponentMetricStore getJobMetricStore(JobID jobID) { + return jobID == null ? null : ComponentMetricStore.unmodifiable(jobs.get(jobID.toString())); --- End diff -- `jobID` should not be nullable. > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236252#comment-16236252 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606599 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java --- @@ -65,15 +65,28 @@ public void getMapFor() throws Exception { assertEquals("2", metrics.get("abc.metric3")); assertEquals("3", metrics.get("abc.metric4")); + assertEquals( + "[" + + "{\"id\":\"abc.metric4\"}," + + "{\"id\":\"abc.metric3\"}" + + "]", + handler.getAvailableMetricsList(pathParams)); + assertEquals("", handler.getMetricsValues(pathParams, "")); + assertEquals( + "[" + + "{\"id\":\"abc.metric3\",\"value\":\"2\"}," + + "{\"id\":\"abc.metric4\",\"value\":\"3\"}" + + "]", + handler.getMetricsValues(pathParams, "abc.metric3,abc.metric4")); } @Test public void getMapForNull() { MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); --- End diff -- Please revert > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236246#comment-16236246 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606465 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsOverview.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Response of metrics handlers, represented as a list of {@link MetricEntry}. + */ +public class MetricsOverview extends ArrayList implements ResponseBody { --- End diff -- Let's not directly extend from `ArrayList` but instead use composition. > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236249#comment-16236249 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606524 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java --- @@ -50,10 +50,10 @@ public void testGetPaths() { @Test public void getMapFor() throws Exception { MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); --- End diff -- Please revert > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236248#comment-16236248 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148604013 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -304,22 +304,21 @@ public WebRuntimeMonitor( get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.LOG, - config)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.STDOUT, - config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.LOG, + config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config)); + get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher)); --- End diff -- why are you adding things to the old `WebRuntimeMonitor`? > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236254#comment-16236254 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606332 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsHeaders.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message header for metrics handler. + */ +public final class MetricsHeaders implements MessageHeaders { + + private static final MetricsHeaders INSTANCE = new MetricsHeaders(); + + public static final String PARAMETER_JOB_ID = "jobid"; --- End diff -- where is this field used? > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236253#comment-16236253 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148609556 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -48,8 +65,43 @@ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { MetricStore.ComponentMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { + return CompletableFuture.supplyAsync( + () -> { + fetcher.update(); + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + List requestedMetrics = request.getQueryParameter(MetricNameParameter.class); + return getMetricsOverview(jobID, requestedMetrics); + }, + executor); + } + + protected MetricsOverview getMetricsOverview(JobID jobID, List requestedMetrics) { + Map metricsMap = getMetricsMapByJobId(jobID, fetcher.getMetricStore()); + if (metricsMap == null) { + return new MetricsOverview(); + } + + if (requestedMetrics == null || requestedMetrics.isEmpty()) { + return new MetricsOverview( + metricsMap.entrySet().stream() + .map(e -> new MetricEntry(e.getKey(), e.getValue())) + .collect(Collectors.toList())); + } else { + return new MetricsOverview( + requestedMetrics.stream() + .filter(e -> metricsMap.get(e) != null) + .map(e -> new MetricEntry(e, metricsMap.get(e))) + .collect(Collectors.toList())); --- End diff -- I think by not using Java streams we can avoid to do for every `e in requestedMetrics` two `HashMap` lookups and instead do it with a single lookup. > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236245#comment-16236245 ] ASF GitHub Bot commented on FLINK-7694: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148604078 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -304,22 +304,21 @@ public WebRuntimeMonitor( get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.LOG, - config)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.STDOUT, - config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.LOG, + config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config)); --- End diff -- Please revert formatting changes. > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606599 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java --- @@ -65,15 +65,28 @@ public void getMapFor() throws Exception { assertEquals("2", metrics.get("abc.metric3")); assertEquals("3", metrics.get("abc.metric4")); + assertEquals( + "[" + + "{\"id\":\"abc.metric4\"}," + + "{\"id\":\"abc.metric3\"}" + + "]", + handler.getAvailableMetricsList(pathParams)); + assertEquals("", handler.getMetricsValues(pathParams, "")); + assertEquals( + "[" + + "{\"id\":\"abc.metric3\",\"value\":\"2\"}," + + "{\"id\":\"abc.metric4\",\"value\":\"3\"}" + + "]", + handler.getMetricsValues(pathParams, "abc.metric3,abc.metric4")); } @Test public void getMapForNull() { MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); --- End diff -- Please revert ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606524 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java --- @@ -50,10 +50,10 @@ public void testGetPaths() { @Test public void getMapFor() throws Exception { MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); --- End diff -- Please revert ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606332 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsHeaders.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message header for metrics handler. + */ +public final class MetricsHeaders implements MessageHeaders { + + private static final MetricsHeaders INSTANCE = new MetricsHeaders(); + + public static final String PARAMETER_JOB_ID = "jobid"; --- End diff -- where is this field used? ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148609899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -32,8 +46,11 @@ * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public class JobMetricsHandler extends AbstractMetricsHandler { +public class JobMetricsHandler extends AbstractMetricsHandler --- End diff -- Why aren't we simply implementing a new handler? I think that we don't reuse any of `AbstractMetricsHandler` functionality and, thus, there is no need to use it as a base class. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148604013 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -304,22 +304,21 @@ public WebRuntimeMonitor( get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.LOG, - config)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.STDOUT, - config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.LOG, + config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config)); + get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher)); --- End diff -- why are you adding things to the old `WebRuntimeMonitor`? ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148609556 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java --- @@ -48,8 +65,43 @@ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { MetricStore.ComponentMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); - return job != null - ? job.metrics - : null; + return job != null ? job.metrics : null; + } + + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { + return CompletableFuture.supplyAsync( + () -> { + fetcher.update(); + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + List requestedMetrics = request.getQueryParameter(MetricNameParameter.class); + return getMetricsOverview(jobID, requestedMetrics); + }, + executor); + } + + protected MetricsOverview getMetricsOverview(JobID jobID, List requestedMetrics) { + Map metricsMap = getMetricsMapByJobId(jobID, fetcher.getMetricStore()); + if (metricsMap == null) { + return new MetricsOverview(); + } + + if (requestedMetrics == null || requestedMetrics.isEmpty()) { + return new MetricsOverview( + metricsMap.entrySet().stream() + .map(e -> new MetricEntry(e.getKey(), e.getValue())) + .collect(Collectors.toList())); + } else { + return new MetricsOverview( + requestedMetrics.stream() + .filter(e -> metricsMap.get(e) != null) + .map(e -> new MetricEntry(e, metricsMap.get(e))) + .collect(Collectors.toList())); --- End diff -- I think by not using Java streams we can avoid to do for every `e in requestedMetrics` two `HashMap` lookups and instead do it with a single lookup. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606465 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsOverview.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Response of metrics handlers, represented as a list of {@link MetricEntry}. + */ +public class MetricsOverview extends ArrayList implements ResponseBody { --- End diff -- Let's not directly extend from `ArrayList` but instead use composition. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148606039 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricMessageParameters.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.metrics; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for getting metrics. + */ +public class MetricMessageParameters extends MessageParameters { --- End diff -- Let's extend from `JobMessageParameters` ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148604078 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -304,22 +304,21 @@ public WebRuntimeMonitor( get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.LOG, - config)); - get(router, - new TaskManagerLogHandler( - retriever, - scheduledExecutor, - localRestAddress, - timeout, - TaskManagerLogHandler.FileMode.STDOUT, - config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.LOG, + config)); + get(router, new TaskManagerLogHandler( + retriever, + scheduledExecutor, + localRestAddress, + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config)); --- End diff -- Please revert formatting changes. ---
[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4757#discussion_r148610350 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java --- @@ -120,6 +121,16 @@ public synchronized ComponentMetricStore getJobMetricStore(String jobID) { } /** +* Returns the {@link ComponentMetricStore} for the given job ID. +* +* @param jobID job ID +* @return ComponentMetricStore for the given ID, or null if no store for the given argument exists +*/ + public synchronized ComponentMetricStore getJobMetricStore(JobID jobID) { + return jobID == null ? null : ComponentMetricStore.unmodifiable(jobs.get(jobID.toString())); --- End diff -- `jobID` should not be nullable. ---
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236191#comment-16236191 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148599674 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java --- @@ -86,10 +85,16 @@ // CompletableFuture allocateSlot( - ScheduledUnit task, + AllocationID allocationID, ResourceProfile resources, Iterable locationPreferences, @RpcTimeout Time timeout); void returnAllocatedSlot(Slot slot); + + /** +* Cancel a slot allocation. +* This method should be called when the CompletableFuture returned by allocateSlot completed exceptionally. --- End diff -- Params description is missing. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236185#comment-16236185 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148591926 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, --- End diff -- We should not remove the `ScheduledUnit` parameter here. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236180#comment-16236180 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148571514 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOw
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236182#comment-16236182 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570658 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool --- End diff -- separate test > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236184#comment-16236184 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148598542 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -1006,7 +1044,13 @@ public boolean returnAllocatedSlot(Slot slot) { Iterable locationPreferences = task.getTaskToExecute().getVertex().getPreferredLocations(); - return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout); + final AllocationID allocationID = new AllocationID(); + CompletableFuture slotFuture = gateway.allocateSlot(allocationID, ResourceProfile.UNKNOWN, locationPreferences, timeout); + slotFuture.exceptionally((Throwable failure) -> { --- End diff -- I think `slotFuture.whenComplete` would better fit here. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236187#comment-16236187 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148596041 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, + public CompletableFuture allocateSlot(AllocationID allocationID, ResourceProfile resources, Iterable locationPreferences, Time timeout) { - return internalAllocateSlot(task, resources, locationPreferences); + return internalAllocateSlot(allocationID, resources, locationPreferences); } @Override public void returnAllocatedSlot(Slot slot) { internalReturnAllocatedSlot(slot); } + @Override + public void cancelSlotAllocation(AllocationID allocationID) { + waitingForResourceManager.remove(allocationID); --- End diff -- we should fail the pending request properly. E.g. check if the slot is in `waitingForResourceManager` or `pendingRequests`. If yes, then remove and call `failPendingRequest`. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236174#comment-16236174 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148571851 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) { } } - private static ResourceManagerGateway createResourceManagerGatewayMock() { + static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway - .requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); + .requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class))) + .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); --- End diff -- Why not returning a proper `CompletableFuture` here? > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236183#comment-16236183 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148571183 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOw
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236170#comment-16236170 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148569315 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -361,9 +374,19 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw } private void checkTimeoutSlotAllocation(AllocationID allocationID) { + removePendingRequestWithException(allocationID, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + } + + private void removePendingRequestWithException(AllocationID allocationID, Exception e) { PendingRequest request = pendingRequests.remove(allocationID); - if (request != null && !request.getFuture().isDone()) { - request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out")); + if (request != null && (!request.getFuture().isDone() || request.getFuture().isCompletedExceptionally())) { + //TODO: the following line depends on the pr: https://github.com/apache/flink/pull/4887 + //if (resourceManagerGateway != null) { + // resourceManagerGateway.cancelSlotRequest(jobId, jobMasterId, allocationID); + //} --- End diff -- This should be removed and added once #4887 has been merged. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236190#comment-16236190 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148602423 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); --- End diff -- I think the test could look the following: ``` slotPoolGateway.allocateSlot(); CompletableFuture numberPendingRequestsFuture = slotPoolGateway.requestNumberPendingRequests(); assertEquals(1, numberPendingRequestsFuture.get()); slotPoolGateway.cancelAllocation(); CompletableFuture numberPendingRequestsFuture = slotPoolGateway.requestNumberPendingRequests(); assertEquals(0, numberPendingRequestsFuture.get()); ``` > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236186#comment-16236186 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148601504 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -645,6 +668,21 @@ AllocatedSlots getAllocatedSlots() { return allocatedSlots; } + @VisibleForTesting + AvailableSlots getAvailableSlots() { + return availableSlots; + } + + @VisibleForTesting + int getNumOfWaitingForResourceRequests() { + return waitingForResourceManager.size(); + } + + @VisibleForTesting + int getNumOfPendingRequests() { + return pendingRequests.size(); + } --- End diff -- I think we should not make internal state easily accessible because it will usually be modified by the main thread. Also when checking a certain interleaving you might be falsely entrapped that you can do something like ``` slotPool.asyncAddPendingRequest() slotPool.getNumOfPendingRequests() // this returns +1 pending requests ``` This is might work but sometimes it also does not work because the concurrent operation has not been completed. I would like to make concurrent operations explicit by, for example, returning a `CompletableFuture getNumberOfPendingRequests` if at all. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236176#comment-16236176 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570094 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); --- End diff -- Better to check with `instanceOf` I think. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236189#comment-16236189 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148602544 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); --- End diff -- It's not guaranteed that `pool.getNumofPendingRequests()` is executed after `pool.cancelSlotAllocation` has been executed. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236171#comment-16236171 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570278 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests --- End diff -- This should be a separate test. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236181#comment-16236181 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570875 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); + + // 3. test the allocation is timed out in client side but the request is fulfilled in slot pool + AllocationID allocationID3 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID3, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, pool.getNumOfPendingRequests()); + assertTrue(pool.getAllocatedSlots().contains(allocationID3)); + + pool.cancelSlotAllocation(allocationID3); + assertFalse(pool.getAllocatedSlots().contains(allocationID3)); + assertTrue(pool.getAvailableSlots().contains(allocationID3)); + } + + @Test + public void testProviderAndOw
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236178#comment-16236178 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148596180 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, + public CompletableFuture allocateSlot(AllocationID allocationID, ResourceProfile resources, Iterable locationPreferences, Time timeout) { - return internalAllocateSlot(task, resources, locationPreferences); + return internalAllocateSlot(allocationID, resources, locationPreferences); } @Override public void returnAllocatedSlot(Slot slot) { internalReturnAllocatedSlot(slot); } + @Override + public void cancelSlotAllocation(AllocationID allocationID) { + waitingForResourceManager.remove(allocationID); + + removePendingRequestWithException(allocationID, new CancellationException("Allocation " + allocationID + " cancelled")); + + if (allocatedSlots.contains(allocationID)) { + Slot slot = allocatedSlots.get(allocationID); --- End diff -- We could avoid the `contains` call by simply calling `get` and then compare against `null`. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236188#comment-16236188 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148599978 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { --- End diff -- This is not needed if you add `throws Exception` to the test method. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236175#comment-16236175 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570569 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfWaitingForResourceRequests()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, pool.getNumOfWaitingForResourceRequests()); + + // 2. test the pending request is in pendingRequests + ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID2 = new AllocationID(); + future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertEquals(AskTimeoutException.class, e.getCause().getClass()); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + + assertEquals(1, pool.getNumOfPendingRequests()); + + pool.cancelSlotAllocation(allocationID2); + assertEquals(0, pool.getNumOfPendingRequests()); + //verify(resourceManagerGateway, times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2); --- End diff -- should be removed > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236179#comment-16236179 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570462 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here --- End diff -- Timeout should be lower > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236177#comment-16236177 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148572112 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) { } } - private static ResourceManagerGateway createResourceManagerGatewayMock() { + static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); --- End diff -- We could think about implementing a `SimpleAckingResourceManagerGateway` for testing purposes. That way we avoid mocking too much. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236172#comment-16236172 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148570387 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java --- @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocation() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(3) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + // 1. test the pending request is in waitingResourceManagerRequests + AllocationID allocationID = new AllocationID(); + CompletableFuture future = slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, Time.seconds(1)); + + try { + future.get(2, TimeUnit.SECONDS); --- End diff -- Timeouts should be lower. > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool
[ https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236173#comment-16236173 ] ASF GitHub Bot commented on FLINK-6434: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148592528 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -361,9 +374,19 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw } private void checkTimeoutSlotAllocation(AllocationID allocationID) { + removePendingRequestWithException(allocationID, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + } + + private void removePendingRequestWithException(AllocationID allocationID, Exception e) { --- End diff -- maybe we could refactor this method into `failPendingRequest(PendingRequest, Exception)`, then it could be used by `checkTimeoutSlotAllocation` and `checkTimeoutRequestWaitingForResourceManager` > There may be allocatedSlots leak in SlotPool > > > Key: FLINK-6434 > URL: https://issues.apache.org/jira/browse/FLINK-6434 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > If the call allocateSlot() from Execution to Slotpool timeout, the job will > begin to failover, but the pending request are still in SlotPool, if then a > new slot register to SlotPool, it may be fulfill the outdated pending request > and be added to allocatedSlots, but it will never be used and will never be > recycled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148601504 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -645,6 +668,21 @@ AllocatedSlots getAllocatedSlots() { return allocatedSlots; } + @VisibleForTesting + AvailableSlots getAvailableSlots() { + return availableSlots; + } + + @VisibleForTesting + int getNumOfWaitingForResourceRequests() { + return waitingForResourceManager.size(); + } + + @VisibleForTesting + int getNumOfPendingRequests() { + return pendingRequests.size(); + } --- End diff -- I think we should not make internal state easily accessible because it will usually be modified by the main thread. Also when checking a certain interleaving you might be falsely entrapped that you can do something like ``` slotPool.asyncAddPendingRequest() slotPool.getNumOfPendingRequests() // this returns +1 pending requests ``` This is might work but sometimes it also does not work because the concurrent operation has not been completed. I would like to make concurrent operations explicit by, for example, returning a `CompletableFuture getNumberOfPendingRequests` if at all. ---
[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148569315 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -361,9 +374,19 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw } private void checkTimeoutSlotAllocation(AllocationID allocationID) { + removePendingRequestWithException(allocationID, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + } + + private void removePendingRequestWithException(AllocationID allocationID, Exception e) { PendingRequest request = pendingRequests.remove(allocationID); - if (request != null && !request.getFuture().isDone()) { - request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out")); + if (request != null && (!request.getFuture().isDone() || request.getFuture().isCompletedExceptionally())) { + //TODO: the following line depends on the pr: https://github.com/apache/flink/pull/4887 + //if (resourceManagerGateway != null) { + // resourceManagerGateway.cancelSlotRequest(jobId, jobMasterId, allocationID); + //} --- End diff -- This should be removed and added once #4887 has been merged. ---
[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148596180 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, + public CompletableFuture allocateSlot(AllocationID allocationID, ResourceProfile resources, Iterable locationPreferences, Time timeout) { - return internalAllocateSlot(task, resources, locationPreferences); + return internalAllocateSlot(allocationID, resources, locationPreferences); } @Override public void returnAllocatedSlot(Slot slot) { internalReturnAllocatedSlot(slot); } + @Override + public void cancelSlotAllocation(AllocationID allocationID) { + waitingForResourceManager.remove(allocationID); + + removePendingRequestWithException(allocationID, new CancellationException("Allocation " + allocationID + " cancelled")); + + if (allocatedSlots.contains(allocationID)) { + Slot slot = allocatedSlots.get(allocationID); --- End diff -- We could avoid the `contains` call by simply calling `get` and then compare against `null`. ---
[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148592528 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -361,9 +374,19 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw } private void checkTimeoutSlotAllocation(AllocationID allocationID) { + removePendingRequestWithException(allocationID, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + } + + private void removePendingRequestWithException(AllocationID allocationID, Exception e) { --- End diff -- maybe we could refactor this method into `failPendingRequest(PendingRequest, Exception)`, then it could be used by `checkTimeoutSlotAllocation` and `checkTimeoutRequestWaitingForResourceManager` ---