[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671443#comment-16671443
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter commented on issue #6972: [FLINK-9635][scheduling] Avoid task 
spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972#issuecomment-435000698
 
 
   Thanks @tillrohrmann !


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671441#comment-16671441
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann closed pull request #6972: [FLINK-9635][scheduling] Avoid task 
spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index 7cb364d687f..ab682a02dd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 
 /**
  * A slot profile describes the profile of a slot into which a task wants to 
be scheduled. The profile contains
@@ -47,16 +48,30 @@
 
/** This contains desired allocation ids of the slot. */
@Nonnull
-   private final Collection priorAllocations;
+   private final Collection preferredAllocations;
+
+   /** This contains all prior allocation ids from the whole execution 
graph. */
+   @Nonnull
+   private final Set previousExecutionGraphAllocations;
+
+   public SlotProfile(
+   @Nonnull ResourceProfile resourceProfile,
+   @Nonnull Collection preferredLocations,
+   @Nonnull Collection preferredAllocations) {
+
+   this(resourceProfile, preferredLocations, preferredAllocations, 
Collections.emptySet());
+   }
 
public SlotProfile(
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection preferredLocations,
-   @Nonnull Collection priorAllocations) {
+   @Nonnull Collection preferredAllocations,
+   @Nonnull Set previousExecutionGraphAllocations) {
 
this.resourceProfile = resourceProfile;
this.preferredLocations = preferredLocations;
-   this.priorAllocations = priorAllocations;
+   this.preferredAllocations = preferredAllocations;
+   this.previousExecutionGraphAllocations = 
previousExecutionGraphAllocations;
}
 
/**
@@ -79,8 +94,18 @@ public ResourceProfile getResourceProfile() {
 * Returns the desired allocation ids for the slot.
 */
@Nonnull
-   public Collection getPriorAllocations() {
-   return priorAllocations;
+   public Collection getPreferredAllocations() {
+   return preferredAllocations;
+   }
+
+   /**
+* Returns a set of all previous allocation ids from the execution 
graph.
+*
+* This is optional and can be empty if unused.
+*/
+   @Nonnull
+   public Set getPreviousExecutionGraphAllocations() {
+   return previousExecutionGraphAllocations;
}
 
/**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..cbf51037b8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -58,6 +58,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -65,6 +66,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -385,7 +387,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   Collections.emptySet());
}
 
/**
@@ -397,18 +400,22 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
 * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
 *   immediately deploy it.
 * @param locationPreferenceConstraint constraint for the location 
preferences
+* @param 

[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669884#comment-16669884
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter opened a new pull request #6972: [FLINK-9635][scheduling] Avoid 
task spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972
 
 
   …al recovery
   
   This PR is just a backport of FLINK-9635, PR #6961 to the release-1.6 branch.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669099#comment-16669099
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

asfgit closed pull request #6961: [FLINK-9635] Fix scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index 7cb364d687f..ab682a02dd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 
 /**
  * A slot profile describes the profile of a slot into which a task wants to 
be scheduled. The profile contains
@@ -47,16 +48,30 @@
 
/** This contains desired allocation ids of the slot. */
@Nonnull
-   private final Collection priorAllocations;
+   private final Collection preferredAllocations;
+
+   /** This contains all prior allocation ids from the whole execution 
graph. */
+   @Nonnull
+   private final Set previousExecutionGraphAllocations;
+
+   public SlotProfile(
+   @Nonnull ResourceProfile resourceProfile,
+   @Nonnull Collection preferredLocations,
+   @Nonnull Collection preferredAllocations) {
+
+   this(resourceProfile, preferredLocations, preferredAllocations, 
Collections.emptySet());
+   }
 
public SlotProfile(
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection preferredLocations,
-   @Nonnull Collection priorAllocations) {
+   @Nonnull Collection preferredAllocations,
+   @Nonnull Set previousExecutionGraphAllocations) {
 
this.resourceProfile = resourceProfile;
this.preferredLocations = preferredLocations;
-   this.priorAllocations = priorAllocations;
+   this.preferredAllocations = preferredAllocations;
+   this.previousExecutionGraphAllocations = 
previousExecutionGraphAllocations;
}
 
/**
@@ -79,8 +94,18 @@ public ResourceProfile getResourceProfile() {
 * Returns the desired allocation ids for the slot.
 */
@Nonnull
-   public Collection getPriorAllocations() {
-   return priorAllocations;
+   public Collection getPreferredAllocations() {
+   return preferredAllocations;
+   }
+
+   /**
+* Returns a set of all previous allocation ids from the execution 
graph.
+*
+* This is optional and can be empty if unused.
+*/
+   @Nonnull
+   public Set getPreviousExecutionGraphAllocations() {
+   return previousExecutionGraphAllocations;
}
 
/**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..cbf51037b8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -58,6 +58,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -65,6 +66,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -385,7 +387,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   Collections.emptySet());
}
 
/**
@@ -397,18 +400,22 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
 * @param queued Flag to indicate whether the scheduler may queue this 
task if it cannot
 *   immediately deploy it.
 * @param locationPreferenceConstraint constraint for the location 
preferences
+* @param 

[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669061#comment-16669061
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter commented on issue #6961: [FLINK-9635] Fix scheduling for local 
recovery
URL: https://github.com/apache/flink/pull/6961#issuecomment-434388623
 
 
   Thanks for the review @tillrohrmann! Travis looks good, proceeding with the 
merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668753#comment-16668753
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229319663
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
 ##
 @@ -58,4 +36,11 @@
 * @return The gateway that can be used to send messages to the 
TaskManager.
 */
TaskManagerGateway getTaskManagerGateway();
+
+   /**
+* Returns the resource profile of the slot.
+*
+* @return the resource profile of the slot.
+*/
+   ResourceProfile getResourceProfile();
 
 Review comment:
   Agreed, we can remove it completely for this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668738#comment-16668738
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r22935
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
 ##
 @@ -48,35 +50,68 @@ private PreviousAllocationSchedulingStrategy() {}
@Override
public  OUT findMatchWithLocality(
@Nonnull SlotProfile slotProfile,
-   @Nonnull Stream candidates,
-   @Nonnull Function contextExtractor,
+   @Nonnull Supplier> candidates,
 
 Review comment:
   Let's keep it like it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668739#comment-16668739
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229310831
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
 ##
 @@ -58,4 +36,11 @@
 * @return The gateway that can be used to send messages to the 
TaskManager.
 */
TaskManagerGateway getTaskManagerGateway();
+
+   /**
+* Returns the resource profile of the slot.
+*
+* @return the resource profile of the slot.
+*/
+   ResourceProfile getResourceProfile();
 
 Review comment:
   Do we need to add this method in this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668679#comment-16668679
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229298268
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
 ##
 @@ -48,35 +50,68 @@ private PreviousAllocationSchedulingStrategy() {}
@Override
public  OUT findMatchWithLocality(
@Nonnull SlotProfile slotProfile,
-   @Nonnull Stream candidates,
-   @Nonnull Function contextExtractor,
+   @Nonnull Supplier> candidates,
 
 Review comment:
   I think both ways have their pros and cons, if we use a supplier for stream, 
we can benefit from the lazy evaluation of streaming. In one case for 
scheduling where we need to iterate the stream twice, havign a collection would 
be better. I feel ok with both variants if you think `Collection` is preferable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668678#comment-16668678
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229297355
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface that provides basic information in the context of a slot.
+ */
+public interface SlotInfo {
+
+   /**
+* Gets the id under which the slot has been allocated on the 
TaskManager. This id uniquely identifies the
+* physical slot.
+*
+* @return The id under which the slot has been allocated on the 
TaskManager
+*/
+   AllocationID getAllocationId();
+
+   /**
+* Gets the location info of the TaskManager that offers this slot.
+*
+* @return The location info of the TaskManager that offers this slot
+*/
+   TaskManagerLocation getTaskManagerLocation();
+
+   /**
+* Gets the number of the slot.
+*
+* @return The number of the slot on the TaskManager.
+*/
+   int getPhysicalSlotNumber();
+
+   /**
+* Returns the resource profile of the slot.
+*
+* @return the resource profile of the slot.
+*/
+   ResourceProfile getResourceProfile();
 
 Review comment:
   Ok, I move it back up into `SlotContext` for now, but expect that it will 
land here in the longer run.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668676#comment-16668676
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229297185
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -1582,12 +1599,18 @@ void clear() {
 * An implementation of the {@link SlotOwner} and {@link SlotProvider} 
interfaces
 * that delegates methods as RPC calls to the SlotPool's RPC gateway.
 */
-   private static class ProviderAndOwner implements SlotOwner, 
SlotProvider {
+   public static class ProviderAndOwner implements SlotOwner, SlotProvider 
{
 
private final SlotPoolGateway gateway;
+   private final boolean requiresPreviousAllocationsForScheduling;
 
-   ProviderAndOwner(SlotPoolGateway gateway) {
+   ProviderAndOwner(SlotPoolGateway gateway, boolean 
requiresPreviousAllocationsForScheduling) {
this.gateway = gateway;
+   this.requiresPreviousAllocationsForScheduling = 
requiresPreviousAllocationsForScheduling;
+   }
+
+   public boolean requiresPreviousAllocationsForScheduling() {
 
 Review comment:
   I agree, and had the same problem. It will naturally go away when we 
continue with changes to the scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668540#comment-16668540
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229258870
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
 ##
 @@ -124,18 +124,31 @@ public void 
matchPreviousAllocationOverridesPreferredLocation() {
}
 
@Test
-   public void matchPreviousLocationNotAvailable() {
+   public void matchPreviousLocationNotAvailableButByLocality() {
 
SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml4), Collections.singletonList(aidX));
SlotContext match = runMatching(slotProfile);
 
-   Assert.assertEquals(null, match);
+   Assert.assertEquals(ssc4, match);
+   }
+
+   @Test
+   public void matchPreviousLocationNotAvailableAndAllBlacklisted() {
+   HashSet blacklisted = new HashSet<>(4);
+   blacklisted.add(aid1);
+   blacklisted.add(aid2);
+   blacklisted.add(aid3);
+   blacklisted.add(aid4);
+   SlotProfile slotProfile = new SlotProfile(resourceProfile, 
Collections.singletonList(tml4), Collections.singletonList(aidX), blacklisted);
+   SlotContext match = runMatching(slotProfile);
+
+   Assert.assertNull(match);
 
 Review comment:
   We should also add a test where not all allocations are black listed and a 
test where all are blacklisted but the preferred allocation is contained in 
this set.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668533#comment-16668533
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229253693
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
 ##
 @@ -48,35 +50,68 @@ private PreviousAllocationSchedulingStrategy() {}
@Override
public  OUT findMatchWithLocality(
@Nonnull SlotProfile slotProfile,
-   @Nonnull Stream candidates,
-   @Nonnull Function contextExtractor,
+   @Nonnull Supplier> candidates,
 
 Review comment:
   Not sure whether `Supplier>` is the best construct to be able to 
iterate over a collection of `IN` multiple times. I think either `Collection` 
or `Iterable` serve a better purpose here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668538#comment-16668538
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229257454
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 ##
 @@ -157,7 +157,7 @@ public void testMultiRegionsFailover() throws Exception {
 
assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
 
-   ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL);
+   ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL, null);
 
 Review comment:
   Same here with `null` in this file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668535#comment-16668535
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229259226
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -1582,12 +1599,18 @@ void clear() {
 * An implementation of the {@link SlotOwner} and {@link SlotProvider} 
interfaces
 * that delegates methods as RPC calls to the SlotPool's RPC gateway.
 */
-   private static class ProviderAndOwner implements SlotOwner, 
SlotProvider {
+   public static class ProviderAndOwner implements SlotOwner, SlotProvider 
{
 
private final SlotPoolGateway gateway;
+   private final boolean requiresPreviousAllocationsForScheduling;
 
-   ProviderAndOwner(SlotPoolGateway gateway) {
+   ProviderAndOwner(SlotPoolGateway gateway, boolean 
requiresPreviousAllocationsForScheduling) {
this.gateway = gateway;
+   this.requiresPreviousAllocationsForScheduling = 
requiresPreviousAllocationsForScheduling;
+   }
+
+   public boolean requiresPreviousAllocationsForScheduling() {
 
 Review comment:
   This is a bit ugly imo but I don't see a better solution at the moment :-(


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668532#comment-16668532
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229250625
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -939,13 +942,18 @@ public void scheduleForExecution() throws JobException {
// collecting all the slots may resize and fail in that 
operation without slots getting lost
final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
+   // a (temporary) optimization to avoid collecting the previous 
allocations for all executions again and again
 
 Review comment:
   Comment is also not 100% correct anymore, I think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668542#comment-16668542
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229258079
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface that provides basic information in the context of a slot.
+ */
+public interface SlotInfo {
+
+   /**
+* Gets the id under which the slot has been allocated on the 
TaskManager. This id uniquely identifies the
+* physical slot.
+*
+* @return The id under which the slot has been allocated on the 
TaskManager
+*/
+   AllocationID getAllocationId();
+
+   /**
+* Gets the location info of the TaskManager that offers this slot.
+*
+* @return The location info of the TaskManager that offers this slot
+*/
+   TaskManagerLocation getTaskManagerLocation();
+
+   /**
+* Gets the number of the slot.
+*
+* @return The number of the slot on the TaskManager.
+*/
+   int getPhysicalSlotNumber();
+
+   /**
+* Returns the resource profile of the slot.
+*
+* @return the resource profile of the slot.
+*/
+   ResourceProfile getResourceProfile();
 
 Review comment:
   I'm not sure whether we need this method here at the moment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668534#comment-16668534
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229257342
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 ##
 @@ -71,7 +71,7 @@ public void testSlotReleasedWhenScheduledImmediately() {
 
assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
// try to deploy to the slot
-   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, null);
 
 Review comment:
   Same here with `null` in this file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668529#comment-16668529
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229250429
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -902,14 +905,14 @@ public void scheduleForExecution() throws JobException {
private CompletableFuture scheduleLazy(SlotProvider slotProvider) 
{
 
final ArrayList> schedulingFutures = 
new ArrayList<>(numVerticesTotal);
-
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
final CompletableFuture 
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
-   LocationPreferenceConstraint.ALL); // 
since it is an input vertex, the input based location preferences should be 
empty
+   LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+   Collections.emptySet()); // we provide 
empty set because we currently don't want to trigger a computation in the batch 
case
 
 Review comment:
   Comment could be updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668530#comment-16668530
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229250999
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ##
 @@ -479,15 +482,20 @@ public void 
connectToPredecessors(Map
public CompletableFuture scheduleAll(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nonnull Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   JavaDoc has not been updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668537#comment-16668537
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229257277
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 ##
 @@ -454,7 +454,7 @@ public void testScheduleOrDeployAfterCancel() {
// it can occur as the result of races
{
Scheduler scheduler = mock(Scheduler.class);
-   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, null);
 
 Review comment:
   should not be `null` in this file I guess


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668531#comment-16668531
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229251102
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +455,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nonnull Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   JavaDoc has not been updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668541#comment-16668541
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229257170
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ##
 @@ -107,6 +107,7 @@ public void testSlotReleaseOnFailedResourceAssignment() 
throws Exception {
slotProvider,
false,
LocationPreferenceConstraint.ALL,
+   null,
 
 Review comment:
   This should not be `null` but instead `Collections.emptySet` I guess.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668536#comment-16668536
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229251157
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -402,13 +405,15 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
public CompletableFuture scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nonnull Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   JavaDoc has not been updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668539#comment-16668539
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

tillrohrmann commented on a change in pull request #6961: [FLINK-9635] Fix 
scheduling for local recovery
URL: https://github.com/apache/flink/pull/6961#discussion_r229255838
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -742,40 +761,52 @@ private void stashRequestWaitingForResourceManager(final 
PendingRequest pendingR
// 

 
@Override
-   public CompletableFuture releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+   public CompletableFuture releaseSlot(
+   SlotRequestId slotRequestId,
+   @Nullable SlotSharingGroupId slotSharingGroupId,
+   Throwable cause) {
+
log.debug("Releasing slot [{}] because: {}", slotRequestId, 
cause != null ? cause.getMessage() : "null");
+   return (slotSharingGroupId != null) ?
+   releaseSharedSlot(slotRequestId, slotSharingGroupId, 
cause) :
+   releaseSingleSlot(slotRequestId, cause);
+   }
 
-   if (slotSharingGroupId != null) {
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.get(slotSharingGroupId);
+   private CompletableFuture releaseSharedSlot(
+   SlotRequestId slotRequestId,
+   @Nonnull SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
 
-   if (multiTaskSlotManager != null) {
-   final SlotSharingManager.TaskSlot taskSlot = 
multiTaskSlotManager.getTaskSlot(slotRequestId);
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.get(slotSharingGroupId);
 
-   if (taskSlot != null) {
-   taskSlot.release(cause);
-   } else {
-   log.debug("Could not find slot [{}] in 
slot sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId);
-   }
+   if (multiTaskSlotManager != null) {
+   final SlotSharingManager.TaskSlot taskSlot = 
multiTaskSlotManager.getTaskSlot(slotRequestId);
+
+   if (taskSlot != null) {
+   taskSlot.release(cause);
} else {
-   log.debug("Could not find slot sharing group 
{}. Ignoring release slot request.", slotSharingGroupId);
+   log.debug("Could not find slot [{}] in slot 
sharing group {}. Ignoring release slot request.", slotRequestId, 
slotSharingGroupId);
}
} else {
-   final PendingRequest pendingRequest = 
removePendingRequest(slotRequestId);
+   log.debug("Could not find slot sharing group {}. 
Ignoring release slot request.", slotSharingGroupId);
+   }
+   return CompletableFuture.completedFuture(Acknowledge.get());
 
 Review comment:
   This line could be moved to `releaseSlot` since this method as well as 
`releaseSingleSlot` simply return a completed future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> 

[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667366#comment-16667366
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter opened a new pull request #6961: [FLINK-9635] Fix scheduling for 
local recovery
URL: https://github.com/apache/flink/pull/6961
 
 
   ## What is the purpose of the change
   
   This change fixes the task spread-out problem in scheduling with local 
recovery. The solution is based on creating a global set of all previous 
allocation ids as blacklist to avoid, but all other allocation ids are now free 
to take again.
   
   ## Brief change log
   
   This PR contains a subset of the changes from #6898 and focuses almost 
purely on the fix of local recovery.
   
   ## Verifying this change
   
   Improved `SchedulingITCase` should work now, including previously ignored 
tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-07 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641027#comment-16641027
 ] 

Till Rohrmann commented on FLINK-9635:
--

The problem exists since Flink {{1.5.0}}. Therefore, I would not make it a 
blocker for {{1.7.0}}. However, the community is actively working on fixing 
this problem for {{1.7.0}}. So I hope that we can include a fix for this 
problem.

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-02 Thread Gyula Fora (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635168#comment-16635168
 ] 

Gyula Fora commented on FLINK-9635:
---

Should we consider this issue a blocker? I know the proper fix is very hard and 
a lot of effort but the current state is very unsafe as well.

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-07-18 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547741#comment-16547741
 ] 

Till Rohrmann commented on FLINK-9635:
--

Yes it still needs to be resolved. FLINK-9583 is just a quick-fix which does 
not solve the problem if local recovery is enabled.

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-07-17 Thread Deepak Sharma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546775#comment-16546775
 ] 

Deepak Sharma commented on FLINK-9635:
--

[~till.rohrmann], does this issue still need to be resolved, seeing as 
FLINK-9583 has been closed? I suppose this Jira tracks the long-term solution?

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)