[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162863#comment-15162863 ] ASF GitHub Bot commented on FLINK-3315: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53926589 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java --- @@ -40,17 +48,142 @@ public void test() { public boolean filter(Long value) { return false; } }; - env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter) - .disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter) - .startNewChain().print(); + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 1") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); + + // verify that a second pipeline does not inherit the groups from the first pipeline + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated-2") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 2") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); JobGraph jobGraph = env.getStreamGraph().getJobGraph(); List vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); - assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup()); + assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertNotEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup()); + assertEquals(vertices.get(4).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup()); + assertEquals(vertices.get(5).getSlotSharingGroup(), vertices.get(6).getSlotSharingGroup()); --- End diff -- That's how I found the ordering out. Apparently first the sources appear and then the rest of the individual pipelines. But here the first pipeline comes first. Given that a topological ordering is partial and, thus, there are multiple linearization of it, I think this test is bound to crash at some point. It is simply too tightly coupled to the internal implementation of the `JobGraph`. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put >
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15162844#comment-15162844 ] ASF GitHub Bot commented on FLINK-3315: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53925301 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java --- @@ -40,17 +48,142 @@ public void test() { public boolean filter(Long value) { return false; } }; - env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter) - .disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter) - .startNewChain().print(); + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 1") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); + + // verify that a second pipeline does not inherit the groups from the first pipeline + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated-2") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 2") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); JobGraph jobGraph = env.getStreamGraph().getJobGraph(); List vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); - assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup()); + assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertNotEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup()); + assertEquals(vertices.get(4).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup()); + assertEquals(vertices.get(5).getSlotSharingGroup(), vertices.get(6).getSlotSharingGroup()); --- End diff -- It would be cumbersome, but giving unique names to the operators and then filtering vertices by name should work. (I think we have the ordering assumption in other places as well.) > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159266#comment-15159266 ] ASF GitHub Bot commented on FLINK-3315: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53817674 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java --- @@ -40,17 +48,142 @@ public void test() { public boolean filter(Long value) { return false; } }; - env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter) - .disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter) - .startNewChain().print(); + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 1") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); + + // verify that a second pipeline does not inherit the groups from the first pipeline + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated-2") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 2") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); JobGraph jobGraph = env.getStreamGraph().getJobGraph(); List vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); - assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup()); + assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertNotEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup()); + assertEquals(vertices.get(4).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup()); + assertEquals(vertices.get(5).getSlotSharingGroup(), vertices.get(6).getSlotSharingGroup()); --- End diff -- Yes, but I think at this low level this is the only way to check that the slot sharing is passed through. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group.
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159265#comment-15159265 ] ASF GitHub Bot commented on FLINK-3315: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1641 > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159233#comment-15159233 ] ASF GitHub Bot commented on FLINK-3315: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53816034 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java --- @@ -40,17 +48,142 @@ public void test() { public boolean filter(Long value) { return false; } }; - env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter) - .disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter) - .startNewChain().print(); + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 1") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); + + // verify that a second pipeline does not inherit the groups from the first pipeline + env.generateSequence(1, 10) + .filter(dummyFilter).slotSharingGroup("isolated-2") + .filter(dummyFilter).slotSharingGroup("default").disableChaining() + .filter(dummyFilter).slotSharingGroup("group 2") + .filter(dummyFilter).startNewChain() + .print().disableChaining(); JobGraph jobGraph = env.getStreamGraph().getJobGraph(); List vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); - assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup()); + assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertNotEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup()); + assertEquals(vertices.get(4).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup()); + assertEquals(vertices.get(5).getSlotSharingGroup(), vertices.get(6).getSlotSharingGroup()); + + int pipelineStart = 6; + assertEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(pipelineStart + 2).getSlotSharingGroup()); + assertNotEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(pipelineStart + 1).getSlotSharingGroup()); + assertNotEquals(vertices.get(pipelineStart + 2).getSlotSharingGroup(), vertices.get(pipelineStart + 3).getSlotSharingGroup()); + assertEquals(vertices.get(pipelineStart + 3).getSlotSharingGroup(), vertices.get(pipelineStart + 4).getSlotSharingGroup()); + assertEquals(vertices.get(pipelineStart + 4).getSlotSharingGroup(), vertices.get(pipelineStart + 5).getSlotSharingGroup()); --- End diff -- Same here. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159215#comment-15159215 ] ASF GitHub Bot commented on FLINK-3315: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1641#issuecomment-187797165 As far as I can tell, the changes look good to me. However, documentation is missing. I think the old `startNewResourceGroup` and `isolateResources` should be removed and the new feature should be included. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159181#comment-15159181 ] ASF GitHub Bot commented on FLINK-3315: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53811188 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -533,4 +563,33 @@ private StreamGraph generateInternal(Listtransformatio return Collections.singleton(transform.getId()); } + /** +* Determines the slot sharing group for an operation based on the slot sharing group set by +* the user and the slot sharing groups of the inputs. +* +* If the user specifies a group name, this is taken as is. If nothing is specified and +* the input operations all have the same group name then this name is taken. Otherwise the +* default group is choosen. +* +* @param specifiedGroup The group specified by the user. +* @param inputIds The IDs of the input operations. +*/ + private String determineSlotSharingGroup(String specifiedGroup, Collection inputIds) { + if (specifiedGroup != null) { + return specifiedGroup; + } else { + String inputGroup = null; + for (int id: inputIds) { + String inputGroupCandidate = streamGraph.getSlotSharingGroup(id); + if (inputGroup == null) { + inputGroup = inputGroupCandidate; + continue; + } + if (!inputGroup.equals(inputGroupCandidate)) { --- End diff -- fixed > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159154#comment-15159154 ] ASF GitHub Bot commented on FLINK-3315: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53808877 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -533,4 +563,33 @@ private StreamGraph generateInternal(Listtransformatio return Collections.singleton(transform.getId()); } + /** +* Determines the slot sharing group for an operation based on the slot sharing group set by +* the user and the slot sharing groups of the inputs. +* +* If the user specifies a group name, this is taken as is. If nothing is specified and +* the input operations all have the same group name then this name is taken. Otherwise the +* default group is choosen. +* +* @param specifiedGroup The group specified by the user. +* @param inputIds The IDs of the input operations. +*/ + private String determineSlotSharingGroup(String specifiedGroup, Collection inputIds) { + if (specifiedGroup != null) { + return specifiedGroup; + } else { + String inputGroup = null; + for (int id: inputIds) { + String inputGroupCandidate = streamGraph.getSlotSharingGroup(id); + if (inputGroup == null) { + inputGroup = inputGroupCandidate; + continue; + } + if (!inputGroup.equals(inputGroupCandidate)) { --- End diff -- Can't we get rid of the `continue` statement by making this an `else if(...)` branch? > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157025#comment-15157025 ] ASF GitHub Bot commented on FLINK-3315: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1641#issuecomment-187190568 Updated on top of master again. This should have the behavior that we want for 1.0. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150571#comment-15150571 ] ASF GitHub Bot commented on FLINK-3315: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1641#issuecomment-185230357 rebasing > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150468#comment-15150468 ] ASF GitHub Bot commented on FLINK-3315: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1641#issuecomment-185196574 I think this needs to be rebased on top of the latest master. The annotations are still called `@Exterimental`. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148658#comment-15148658 ] ASF GitHub Bot commented on FLINK-3315: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53017220 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,6 +203,29 @@ public String getUid() { } /** +* Returns the slot sharing group of this transformation. +* +* @see #setSlotSharingGroup(String) +*/ + public String getSlotSharingGroup() { + return slotSharingGroup; + } + + /** +* Sets the slot sharing group of this transformation. Parallels instances of operations that --- End diff -- Fixed > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148596#comment-15148596 ] ASF GitHub Bot commented on FLINK-3315: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53010068 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,6 +203,29 @@ public String getUid() { } /** +* Returns the slot sharing group of this transformation. +* +* @see #setSlotSharingGroup(String) +*/ + public String getSlotSharingGroup() { + return slotSharingGroup; + } + + /** +* Sets the slot sharing group of this transformation. Parallels instances of operations that --- End diff -- Parallels typo (also in other variants of this method) > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148548#comment-15148548 ] ASF GitHub Bot commented on FLINK-3315: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1641 [FLINK-3315] Fix Slot Sharing in Streaming API This changes slot sharing settings to single method slotSharingGroup(String) on DataStream. Operations inherit the slot sharing group of the input if all input operations are in the same slot sharing group. The default slot sharing group is "default" this can also be explicitly set using slotSharingGroup("default"). This overrides the inheriting behaviour. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink slotsharing-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1641.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1641 commit f9bd1d25b26e639318dc737c3f7a1ce75df445fe Author: Aljoscha KrettekDate: 2016-02-02T12:11:12Z [FLINK-3315] Fix Slot Sharing in Streaming API This changes slot sharing settings to single method slotSharingGroup(String) on DataStream. Operations inherit the slot sharing group of the input if all input operations are in the same slot sharing group. The default slot sharing group is "default" this can also be explicitly set using slotSharingGroup("default"). This overrides the inheriting behaviour. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)