[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162863#comment-15162863
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53926589
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
 ---
@@ -40,17 +48,142 @@ public void test() {
public boolean filter(Long value) { return false; }
};
 
-   env.generateSequence(1, 
10).filter(dummyFilter).isolateResources().filter(dummyFilter)
-   
.disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
-   .startNewChain().print();
+   env.generateSequence(1, 10)
+   .filter(dummyFilter).slotSharingGroup("isolated")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 1")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
+
+   // verify that a second pipeline does not inherit the groups 
from the first pipeline
+   env.generateSequence(1, 10)
+   
.filter(dummyFilter).slotSharingGroup("isolated-2")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 2")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
 
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
List vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
 
-   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(3).getSlotSharingGroup());
+   assertNotEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertNotEquals(vertices.get(3).getSlotSharingGroup(), 
vertices.get(4).getSlotSharingGroup());
+   assertEquals(vertices.get(4).getSlotSharingGroup(), 
vertices.get(5).getSlotSharingGroup());
+   assertEquals(vertices.get(5).getSlotSharingGroup(), 
vertices.get(6).getSlotSharingGroup());
--- End diff --

That's how I found the ordering out. Apparently first the sources appear 
and then the rest of the individual pipelines. But here the first pipeline 
comes first. 

Given that a topological ordering is partial and, thus, there are multiple 
linearization of it, I think this test is bound to crash at some point. It is 
simply too tightly coupled to the internal implementation of the `JobGraph`.


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> 

[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162844#comment-15162844
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53925301
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
 ---
@@ -40,17 +48,142 @@ public void test() {
public boolean filter(Long value) { return false; }
};
 
-   env.generateSequence(1, 
10).filter(dummyFilter).isolateResources().filter(dummyFilter)
-   
.disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
-   .startNewChain().print();
+   env.generateSequence(1, 10)
+   .filter(dummyFilter).slotSharingGroup("isolated")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 1")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
+
+   // verify that a second pipeline does not inherit the groups 
from the first pipeline
+   env.generateSequence(1, 10)
+   
.filter(dummyFilter).slotSharingGroup("isolated-2")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 2")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
 
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
List vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
 
-   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(3).getSlotSharingGroup());
+   assertNotEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertNotEquals(vertices.get(3).getSlotSharingGroup(), 
vertices.get(4).getSlotSharingGroup());
+   assertEquals(vertices.get(4).getSlotSharingGroup(), 
vertices.get(5).getSlotSharingGroup());
+   assertEquals(vertices.get(5).getSlotSharingGroup(), 
vertices.get(6).getSlotSharingGroup());
--- End diff --

It would be cumbersome, but giving unique names to the operators and then 
filtering vertices by name should work. (I think we have the ordering 
assumption in other places as well.)


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this 

[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159266#comment-15159266
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53817674
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
 ---
@@ -40,17 +48,142 @@ public void test() {
public boolean filter(Long value) { return false; }
};
 
-   env.generateSequence(1, 
10).filter(dummyFilter).isolateResources().filter(dummyFilter)
-   
.disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
-   .startNewChain().print();
+   env.generateSequence(1, 10)
+   .filter(dummyFilter).slotSharingGroup("isolated")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 1")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
+
+   // verify that a second pipeline does not inherit the groups 
from the first pipeline
+   env.generateSequence(1, 10)
+   
.filter(dummyFilter).slotSharingGroup("isolated-2")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 2")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
 
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
List vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
 
-   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(3).getSlotSharingGroup());
+   assertNotEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertNotEquals(vertices.get(3).getSlotSharingGroup(), 
vertices.get(4).getSlotSharingGroup());
+   assertEquals(vertices.get(4).getSlotSharingGroup(), 
vertices.get(5).getSlotSharingGroup());
+   assertEquals(vertices.get(5).getSlotSharingGroup(), 
vertices.get(6).getSlotSharingGroup());
--- End diff --

Yes, but I think at this low level this is the only way to check that the 
slot sharing is passed through.


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159265#comment-15159265
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1641


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159233#comment-15159233
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53816034
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
 ---
@@ -40,17 +48,142 @@ public void test() {
public boolean filter(Long value) { return false; }
};
 
-   env.generateSequence(1, 
10).filter(dummyFilter).isolateResources().filter(dummyFilter)
-   
.disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
-   .startNewChain().print();
+   env.generateSequence(1, 10)
+   .filter(dummyFilter).slotSharingGroup("isolated")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 1")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
+
+   // verify that a second pipeline does not inherit the groups 
from the first pipeline
+   env.generateSequence(1, 10)
+   
.filter(dummyFilter).slotSharingGroup("isolated-2")
+   
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
+   .filter(dummyFilter).slotSharingGroup("group 2")
+   .filter(dummyFilter).startNewChain()
+   .print().disableChaining();
 
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
List vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
 
-   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(3).getSlotSharingGroup());
+   assertNotEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+   assertNotEquals(vertices.get(3).getSlotSharingGroup(), 
vertices.get(4).getSlotSharingGroup());
+   assertEquals(vertices.get(4).getSlotSharingGroup(), 
vertices.get(5).getSlotSharingGroup());
+   assertEquals(vertices.get(5).getSlotSharingGroup(), 
vertices.get(6).getSlotSharingGroup());
+
+   int pipelineStart = 6;
+   assertEquals(vertices.get(1).getSlotSharingGroup(), 
vertices.get(pipelineStart + 2).getSlotSharingGroup());
+   assertNotEquals(vertices.get(1).getSlotSharingGroup(), 
vertices.get(pipelineStart + 1).getSlotSharingGroup());
+   assertNotEquals(vertices.get(pipelineStart + 
2).getSlotSharingGroup(), vertices.get(pipelineStart + 
3).getSlotSharingGroup());
+   assertEquals(vertices.get(pipelineStart + 
3).getSlotSharingGroup(), vertices.get(pipelineStart + 
4).getSlotSharingGroup());
+   assertEquals(vertices.get(pipelineStart + 
4).getSlotSharingGroup(), vertices.get(pipelineStart + 
5).getSlotSharingGroup());
--- End diff --

Same here.


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source 

[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159215#comment-15159215
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1641#issuecomment-187797165
  
As far as I can tell, the changes look good to me. However, documentation 
is missing. I think the old `startNewResourceGroup` and `isolateResources` 
should be removed and the new feature should be included.


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159181#comment-15159181
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53811188
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -533,4 +563,33 @@ private StreamGraph 
generateInternal(List transformatio
return Collections.singleton(transform.getId());
}
 
+   /**
+* Determines the slot sharing group for an operation based on the slot 
sharing group set by
+* the user and the slot sharing groups of the inputs.
+*
+* If the user specifies a group name, this is taken as is. If 
nothing is specified and
+* the input operations all have the same group name then this name is 
taken. Otherwise the
+* default group is choosen.
+*
+* @param specifiedGroup The group specified by the user.
+* @param inputIds The IDs of the input operations.
+*/
+   private String determineSlotSharingGroup(String specifiedGroup, 
Collection inputIds) {
+   if (specifiedGroup != null) {
+   return specifiedGroup;
+   } else {
+   String inputGroup = null;
+   for (int id: inputIds) {
+   String inputGroupCandidate = 
streamGraph.getSlotSharingGroup(id);
+   if (inputGroup == null) {
+   inputGroup = inputGroupCandidate;
+   continue;
+   }
+   if (!inputGroup.equals(inputGroupCandidate)) {
--- End diff --

fixed


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159154#comment-15159154
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53808877
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -533,4 +563,33 @@ private StreamGraph 
generateInternal(List transformatio
return Collections.singleton(transform.getId());
}
 
+   /**
+* Determines the slot sharing group for an operation based on the slot 
sharing group set by
+* the user and the slot sharing groups of the inputs.
+*
+* If the user specifies a group name, this is taken as is. If 
nothing is specified and
+* the input operations all have the same group name then this name is 
taken. Otherwise the
+* default group is choosen.
+*
+* @param specifiedGroup The group specified by the user.
+* @param inputIds The IDs of the input operations.
+*/
+   private String determineSlotSharingGroup(String specifiedGroup, 
Collection inputIds) {
+   if (specifiedGroup != null) {
+   return specifiedGroup;
+   } else {
+   String inputGroup = null;
+   for (int id: inputIds) {
+   String inputGroupCandidate = 
streamGraph.getSlotSharingGroup(id);
+   if (inputGroup == null) {
+   inputGroup = inputGroupCandidate;
+   continue;
+   }
+   if (!inputGroup.equals(inputGroupCandidate)) {
--- End diff --

Can't we get rid of the `continue` statement by making this an `else 
if(...)` branch?


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157025#comment-15157025
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1641#issuecomment-187190568
  
Updated on top of master again. This should have the behavior that we want 
for 1.0.


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150571#comment-15150571
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1641#issuecomment-185230357
  
rebasing



> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150468#comment-15150468
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1641#issuecomment-185196574
  
I think this needs to be rebased on top of the latest master.

The annotations are still called `@Exterimental`.


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148658#comment-15148658
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53017220
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,6 +203,29 @@ public String getUid() {
}
 
/**
+* Returns the slot sharing group of this transformation.
+*
+* @see #setSlotSharingGroup(String)
+*/
+   public String getSlotSharingGroup() {
+   return slotSharingGroup;
+   }
+
+   /**
+* Sets the slot sharing group of this transformation. Parallels 
instances of operations that
--- End diff --

Fixed


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148596#comment-15148596
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53010068
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,6 +203,29 @@ public String getUid() {
}
 
/**
+* Returns the slot sharing group of this transformation.
+*
+* @see #setSlotSharingGroup(String)
+*/
+   public String getSlotSharingGroup() {
+   return slotSharingGroup;
+   }
+
+   /**
+* Sets the slot sharing group of this transformation. Parallels 
instances of operations that
--- End diff --

Parallels typo (also in other variants of this method)


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148548#comment-15148548
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1641

[FLINK-3315] Fix Slot Sharing in Streaming API

This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink slotsharing-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1641


commit f9bd1d25b26e639318dc737c3f7a1ce75df445fe
Author: Aljoscha Krettek 
Date:   2016-02-02T12:11:12Z

[FLINK-3315] Fix Slot Sharing in Streaming API

This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.




> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)