[GitHub] flink pull request: [FLINK-1478] Add support for strictly local in...

2015-02-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/375#issuecomment-73477321
  
Looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1478] Add support for strictly local in...

2015-02-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/375#issuecomment-73505358
  
Only minor remarks. 
Looks good otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1478] Add support for strictly local in...

2015-02-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/375#discussion_r24327047
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -260,15 +260,49 @@ public void 
connectToPredecessors(Map

public void scheduleAll(Scheduler scheduler, boolean queued) throws 
NoResourceAvailableException {

-// ExecutionVertex[] vertices = this.taskVertices;
-// 
-// for (int i = 0; i < vertices.length; i++) {
-// ExecutionVertex v = vertices[i];
-// 
-// if (v.get 
-// }
+   ExecutionVertex[] vertices = this.taskVertices;

-   for (ExecutionVertex ev : getTaskVertices()) {
+   // check if we need to do pre-assignment of tasks
+   if (inputSplitsPerSubtask != null) {
+   
+   final Map> instances = 
scheduler.getInstancesByHost();
+   final Map assignments = new 
HashMap();
+   
+   for (int i = 0; i < vertices.length; i++) {
+   List splitsForHost = 
inputSplitsPerSubtask[i];
+   if (splitsForHost == null || 
splitsForHost.isEmpty()) {
+   continue;
+   }
+   
+   String[] hostNames = 
splitsForHost.get(0).getHostnames();
+   if (hostNames == null || hostNames.length == 0 
|| hostNames[0] == null) {
+   continue;
+   }
+   
+   String host = hostNames[0];
+   ExecutionVertex v = vertices[i];
+   
+   List instancesOnHost = 
instances.get(host);
+   
+   if (instancesOnHost == null || 
instancesOnHost.isEmpty()) {
+   throw new 
NoResourceAvailableException("Cannot schedule a strictly local task to host " + 
host
+   + ". No TaskManager 
available on that host.");
+   }
+   
+   Integer pos = assignments.get(host);
+   if (pos == null) {
+   pos = 0;
+   assignments.put(host, 0);
+   } else {
+   assignments.put(host, pos + 1 % 
instancesOnHost.size());
--- End diff --

Doesn't this potentially cause multiple subtasks being assigned to the same 
instance?
I guess that would fail in the scheduler. Shouldn't we catch the case here 
and return a more detailed exception why scheduling constraint could not be 
fulfilled?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1478] Add support for strictly local in...

2015-02-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/375#discussion_r24329907
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -260,15 +260,49 @@ public void 
connectToPredecessors(Map

public void scheduleAll(Scheduler scheduler, boolean queued) throws 
NoResourceAvailableException {

-// ExecutionVertex[] vertices = this.taskVertices;
-// 
-// for (int i = 0; i < vertices.length; i++) {
-// ExecutionVertex v = vertices[i];
-// 
-// if (v.get 
-// }
+   ExecutionVertex[] vertices = this.taskVertices;

-   for (ExecutionVertex ev : getTaskVertices()) {
+   // check if we need to do pre-assignment of tasks
+   if (inputSplitsPerSubtask != null) {
+   
+   final Map> instances = 
scheduler.getInstancesByHost();
+   final Map assignments = new 
HashMap();
+   
+   for (int i = 0; i < vertices.length; i++) {
+   List splitsForHost = 
inputSplitsPerSubtask[i];
+   if (splitsForHost == null || 
splitsForHost.isEmpty()) {
+   continue;
+   }
+   
+   String[] hostNames = 
splitsForHost.get(0).getHostnames();
+   if (hostNames == null || hostNames.length == 0 
|| hostNames[0] == null) {
+   continue;
+   }
+   
+   String host = hostNames[0];
+   ExecutionVertex v = vertices[i];
+   
+   List instancesOnHost = 
instances.get(host);
+   
+   if (instancesOnHost == null || 
instancesOnHost.isEmpty()) {
+   throw new 
NoResourceAvailableException("Cannot schedule a strictly local task to host " + 
host
+   + ". No TaskManager 
available on that host.");
+   }
+   
+   Integer pos = assignments.get(host);
+   if (pos == null) {
+   pos = 0;
+   assignments.put(host, 0);
+   } else {
+   assignments.put(host, pos + 1 % 
instancesOnHost.size());
--- End diff --

It should be possible that multiple subtasks go to the same instance. If 
there are too many, it would fail in the scheduler, yes. We can check the the 
number of subtasks on the instance does not exceed the number of slots.

This seems to me like a workaround solution anyways (until we can tie 
splits to tasks), so it might be okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1478] Add support for strictly local in...

2015-02-09 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/375#discussion_r24330528
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -260,15 +260,49 @@ public void 
connectToPredecessors(Map

public void scheduleAll(Scheduler scheduler, boolean queued) throws 
NoResourceAvailableException {

-// ExecutionVertex[] vertices = this.taskVertices;
-// 
-// for (int i = 0; i < vertices.length; i++) {
-// ExecutionVertex v = vertices[i];
-// 
-// if (v.get 
-// }
+   ExecutionVertex[] vertices = this.taskVertices;

-   for (ExecutionVertex ev : getTaskVertices()) {
+   // check if we need to do pre-assignment of tasks
+   if (inputSplitsPerSubtask != null) {
+   
+   final Map> instances = 
scheduler.getInstancesByHost();
+   final Map assignments = new 
HashMap();
+   
+   for (int i = 0; i < vertices.length; i++) {
+   List splitsForHost = 
inputSplitsPerSubtask[i];
+   if (splitsForHost == null || 
splitsForHost.isEmpty()) {
+   continue;
+   }
+   
+   String[] hostNames = 
splitsForHost.get(0).getHostnames();
+   if (hostNames == null || hostNames.length == 0 
|| hostNames[0] == null) {
+   continue;
+   }
+   
+   String host = hostNames[0];
+   ExecutionVertex v = vertices[i];
+   
+   List instancesOnHost = 
instances.get(host);
+   
+   if (instancesOnHost == null || 
instancesOnHost.isEmpty()) {
+   throw new 
NoResourceAvailableException("Cannot schedule a strictly local task to host " + 
host
+   + ". No TaskManager 
available on that host.");
+   }
+   
+   Integer pos = assignments.get(host);
+   if (pos == null) {
+   pos = 0;
+   assignments.put(host, 0);
+   } else {
+   assignments.put(host, pos + 1 % 
instancesOnHost.size());
--- End diff --

Ah, yes sure. I confused instances and slots...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1478] Add support for strictly local in...

2015-02-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/375


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---