Repository: flink Updated Branches: refs/heads/master 7f0ce1428 -> ca82b0cc3
[FLINK-1948] [streaming] Manual task slot sharing settings added for stream operators Closes #634 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca82b0cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca82b0cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca82b0cc Branch: refs/heads/master Commit: ca82b0cc36added997cc60aa246a53b76970a355 Parents: 7f0ce14 Author: Gyula Fora <gyf...@apache.org> Authored: Tue Apr 28 23:39:32 2015 +0200 Committer: mbalassi <mbala...@apache.org> Committed: Wed Apr 29 14:05:55 2015 +0200 ---------------------------------------------------------------------- .../datastream/SingleOutputStreamOperator.java | 73 ++++++++++++++++++-- .../temporal/StreamCrossOperator.java | 2 +- .../environment/StreamExecutionEnvironment.java | 12 ++++ .../streaming/api/graph/JSONGenerator.java | 12 ++-- .../flink/streaming/api/graph/StreamGraph.java | 73 +++++++++++++------- .../flink/streaming/api/graph/StreamNode.java | 16 +++++ .../api/graph/StreamingJobGraphGenerator.java | 30 +++++--- .../streaming/api/graph/WindowingOptimizer.java | 10 +-- .../streaming/api/operators/StreamOperator.java | 12 +++- .../flink/streaming/api/CoStreamTest.java | 3 +- .../streaming/api/graph/SlotAllocationTest.java | 62 +++++++++++++++++ .../api/operators/co/SelfConnectionTest.java | 5 +- .../flink/streaming/api/scala/DataStream.scala | 62 ++++++++++++++++- .../api/scala/StreamCrossOperator.scala | 2 +- .../api/scala/StreamExecutionEnvironment.scala | 11 +++ 15 files changed, 327 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index aa70e3f..8141b75 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy; @@ -57,8 +58,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato } /** - * Sets the parallelism for this operator. The degree must be 1 or - * more. + * Sets the parallelism for this operator. The degree must be 1 or more. * * @param parallelism * The parallelism for this operator. @@ -118,11 +118,43 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato return new SingleOutputStreamOperator<OUT, O>(this); } - public SingleOutputStreamOperator<OUT, O> setChainingStrategy(ChainingStrategy strategy) { + /** + * Sets the {@link ChainingStrategy} for the given operator affecting the + * way operators will possibly be co-located on the same thread for + * increased performance. + * + * @param strategy + * The selected {@link ChainingStrategy} + * @return The operator with the modified chaining strategy + */ + private SingleOutputStreamOperator<OUT, O> setChainingStrategy(ChainingStrategy strategy) { this.operator.setChainingStrategy(strategy); return this; } - + + /** + * Turns off chaining for this operator so thread co-location will not be + * used as an optimization. </p> Chaining can be turned off for the whole + * job by {@link StreamExecutionEnvironment#disableOperatorChaning()} + * however it is not advised for performance considerations. + * + * @return The operator with chaining disabled + */ + public SingleOutputStreamOperator<OUT, O> disableChaining() { + return setChainingStrategy(ChainingStrategy.NEVER); + } + + /** + * Starts a new task chain beginning at this operator. This operator will + * not be chained (thread co-located for increased performance) to any + * previous tasks even if possible. + * + * @return The operator with chaining set. + */ + public SingleOutputStreamOperator<OUT, O> startNewChain() { + return setChainingStrategy(ChainingStrategy.HEAD); + } + /** * Adds a type information hint about the return type of this operator. * @@ -237,4 +269,37 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato } } + /** + * By default all operators in a streaming job share the same resource + * group. Each resource group takes as many task manager slots as the + * maximum parallelism operator in that group. Task chaining is only + * possible within one resource group. By calling this method, this + * operators starts a new resource group and all subsequent operators will + * be added to this group unless specified otherwise. </p> Please note that + * local executions have by default as many available task slots as the + * environment parallelism, so in order to start a new resource group the + * degree of parallelism for the operators must be decreased from the + * default. + * + * @return The operator as a part of a new resource group. + */ + public SingleOutputStreamOperator<OUT, O> startNewResourceGroup() { + streamGraph.setResourceStrategy(getId(), ResourceStrategy.NEWGROUP); + return this; + } + + /** + * Isolates the operator in its own resource group. This will cause the + * operator to grab as many task slots as its degree of parallelism. If + * there are no free resources available, the job will fail to start. It + * also disables chaining for this operator </p>All subsequent operators are + * assigned to the default resource group. + * + * @return The operator with isolated resource group. + */ + public SingleOutputStreamOperator<OUT, O> isolateResources() { + streamGraph.setResourceStrategy(getId(), ResourceStrategy.ISOLATE); + return this; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java index dbd295f..3dd02a3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java @@ -76,7 +76,7 @@ public class StreamCrossOperator<I1, I2> extends @SuppressWarnings("unchecked") public CrossWindow<I1, I2> every(long length) { - ((CoStreamWindow<I1, I2, ?>) streamGraph.getVertex(id).getOperator()) + ((CoStreamWindow<I1, I2, ?>) streamGraph.getStreamNode(id).getOperator()) .setSlideSize(length); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 3e935f5..b0471f9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -190,6 +190,18 @@ public abstract class StreamExecutionEnvironment { } /** + * Disables operator chaining for streaming operators. Operator chaining + * allows non-shuffle operations to be co-located in the same thread fully + * avoiding serialization and de-serialization. + * + * @return StreamExecutionEnvironment with chaining disabled. + */ + public StreamExecutionEnvironment disableOperatorChaning() { + streamGraph.setChaining(false); + return this; + } + + /** * Method for enabling fault-tolerance. Activates monitoring and backup of * streaming operator states. * http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java index 8d8ded9..a0e0a36 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -60,7 +60,7 @@ public class JSONGenerator { Map<Integer, Integer> edgeRemapings) throws JSONException { Integer vertexID = toVisit.get(0); - StreamNode vertex = streamGraph.getVertex(vertexID); + StreamNode vertex = streamGraph.getStreamNode(vertexID); if (streamGraph.getSourceIDs().contains(vertexID) || Collections.disjoint(vertex.getInEdges(), toVisit)) { @@ -97,7 +97,7 @@ public class JSONGenerator { obj.put(STEPS, iterationSteps); obj.put(ID, iterationHead); obj.put(PACT, "IterativeDataStream"); - obj.put(PARALLELISM, streamGraph.getVertex(iterationHead).getParallelism()); + obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism()); obj.put(CONTENTS, "Stream Iteration"); JSONArray iterationInputs = new JSONArray(); obj.put(PREDECESSORS, iterationInputs); @@ -115,7 +115,7 @@ public class JSONGenerator { Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException { Integer vertexID = toVisit.get(0); - StreamNode vertex = streamGraph.getVertex(vertexID); + StreamNode vertex = streamGraph.getStreamNode(vertexID); toVisit.remove(vertexID); // Ignoring head and tail to avoid redundancy @@ -154,7 +154,7 @@ public class JSONGenerator { private void decorateNode(Integer vertexID, JSONObject node) throws JSONException { - StreamNode vertex = streamGraph.getVertex(vertexID); + StreamNode vertex = streamGraph.getStreamNode(vertexID); node.put(ID, vertexID); node.put(TYPE, vertex.getOperatorName()); @@ -165,7 +165,7 @@ public class JSONGenerator { node.put(PACT, "Data Stream"); } - StreamOperator<?, ?> operator = streamGraph.getVertex(vertexID).getOperator(); + StreamOperator<?, ?> operator = streamGraph.getStreamNode(vertexID).getOperator(); if (operator != null && operator.getUserFunction() != null) { node.put(CONTENTS, vertex.getOperatorName() + " at " @@ -174,7 +174,7 @@ public class JSONGenerator { node.put(CONTENTS, vertex.getOperatorName()); } - node.put(PARALLELISM, streamGraph.getVertex(vertexID).getParallelism()); + node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index bfeed28..93bf8eb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -173,15 +173,15 @@ public class StreamGraph extends StreamingPlan { chaining = false; - StreamLoop iteration = new StreamLoop(iterationID, getVertex(iterationHead), timeOut); + StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(iterationHead), timeOut); streamLoops.put(iterationID, iteration); vertexIDtoLoop.put(vertexID, iteration); setSerializersFrom(iterationHead, vertexID); - getVertex(vertexID).setOperatorName("IterationHead-" + iterationHead); + getStreamNode(vertexID).setOperatorName("IterationHead-" + iterationHead); - int outpartitionerIndex = getVertex(iterationHead).getInEdgeIndices().get(0); - StreamPartitioner<?> outputPartitioner = getVertex(outpartitionerIndex).getOutEdges() + int outpartitionerIndex = getStreamNode(iterationHead).getInEdgeIndices().get(0); + StreamPartitioner<?> outputPartitioner = getStreamNode(outpartitionerIndex).getOutEdges() .get(0).getPartitioner(); addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>()); @@ -196,22 +196,23 @@ public class StreamGraph extends StreamingPlan { public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID, long waitTime) { - if (getVertex(iterationTail).getBufferTimeout() == 0) { + if (getStreamNode(iterationTail).getBufferTimeout() == 0) { throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); } addNode(vertexID, StreamIterationTail.class, null, null).setParallelism( - getVertex(iterationTail).getParallelism()); + getStreamNode(iterationTail).getParallelism()); StreamLoop iteration = streamLoops.get(iterationID); - iteration.setTail(getVertex(iterationTail)); + iteration.setTail(getStreamNode(iterationTail)); vertexIDtoLoop.put(vertexID, iteration); setSerializersFrom(iterationTail, vertexID); - getVertex(vertexID).setOperatorName("IterationTail-" + iterationTail); + getStreamNode(vertexID).setOperatorName("IterationTail-" + iterationTail); - setParallelism(iteration.getHead().getID(), getVertex(iterationTail).getParallelism()); - setBufferTimeout(iteration.getHead().getID(), getVertex(iterationTail).getBufferTimeout()); + setParallelism(iteration.getHead().getID(), getStreamNode(iterationTail).getParallelism()); + setBufferTimeout(iteration.getHead().getID(), getStreamNode(iterationTail) + .getBufferTimeout()); if (LOG.isDebugEnabled()) { LOG.debug("ITERATION SINK: {}", vertexID); @@ -233,14 +234,14 @@ public class StreamGraph extends StreamingPlan { public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) { - StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID), - getVertex(downStreamVertexID), typeNumber, outputNames, partitionerObject); - getVertex(edge.getSourceID()).addOutEdge(edge); - getVertex(edge.getTargetID()).addInEdge(edge); + StreamEdge edge = new StreamEdge(getStreamNode(upStreamVertexID), + getStreamNode(downStreamVertexID), typeNumber, outputNames, partitionerObject); + getStreamNode(edge.getSourceID()).addOutEdge(edge); + getStreamNode(edge.getTargetID()).addInEdge(edge); } public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) { - getVertex(vertexID).addOutputSelector(outputSelector); + getStreamNode(vertexID).addOutputSelector(outputSelector); if (LOG.isDebugEnabled()) { LOG.debug("Outputselector set for {}", vertexID); @@ -249,24 +250,24 @@ public class StreamGraph extends StreamingPlan { } public void setParallelism(Integer vertexID, int parallelism) { - getVertex(vertexID).setParallelism(parallelism); + getStreamNode(vertexID).setParallelism(parallelism); } public void setBufferTimeout(Integer vertexID, long bufferTimeout) { - getVertex(vertexID).setBufferTimeout(bufferTimeout); + getStreamNode(vertexID).setBufferTimeout(bufferTimeout); } private void setSerializers(Integer vertexID, StreamRecordSerializer<?> in1, StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out) { - StreamNode vertex = getVertex(vertexID); + StreamNode vertex = getStreamNode(vertexID); vertex.setSerializerIn1(in1); vertex.setSerializerIn2(in2); vertex.setSerializerOut(out); } private void setSerializersFrom(Integer from, Integer to) { - StreamNode fromVertex = getVertex(from); - StreamNode toVertex = getVertex(to); + StreamNode fromVertex = getStreamNode(from); + StreamNode toVertex = getStreamNode(to); toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut()); toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1()); @@ -275,18 +276,32 @@ public class StreamGraph extends StreamingPlan { public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) { StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType, executionConfig); - getVertex(vertexID).setSerializerOut(serializer); + getStreamNode(vertexID).setSerializerOut(serializer); } public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<IN, OUT> operatorObject) { - getVertex(vertexID).setOperator(operatorObject); + getStreamNode(vertexID).setOperator(operatorObject); } public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) { - getVertex(vertexID).setInputFormat(inputFormat); + getStreamNode(vertexID).setInputFormat(inputFormat); + } + + public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) { + StreamNode node = getStreamNode(vertexID); + switch (strategy) { + case ISOLATE: + node.isolateSlot(); + break; + case NEWGROUP: + node.startNewSlotSharingGroup(); + break; + default: + throw new IllegalArgumentException("Unknown resource strategy"); + } } - public StreamNode getVertex(Integer vertexID) { + public StreamNode getStreamNode(Integer vertexID) { return streamNodes.get(vertexID); } @@ -295,7 +310,7 @@ public class StreamGraph extends StreamingPlan { } protected StreamEdge getEdge(int sourceId, int targetId) { - Iterator<StreamEdge> outIterator = getVertex(sourceId).getOutEdges().iterator(); + Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator(); while (outIterator.hasNext()) { StreamEdge edge = outIterator.next(); @@ -311,6 +326,10 @@ public class StreamGraph extends StreamingPlan { return sources; } + public Collection<StreamNode> getStreamNodes() { + return streamNodes.values(); + } + public Set<Tuple2<Integer, StreamOperator<?, ?>>> getOperators() { Set<Tuple2<Integer, StreamOperator<?, ?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?, ?>>>(); for (StreamNode vertex : streamNodes.values()) { @@ -414,6 +433,10 @@ public class StreamGraph extends StreamingPlan { } } + public static enum ResourceStrategy { + DEFAULT, ISOLATE, NEWGROUP + } + /** * Object for representing loops in streaming programs. * http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 9672035..7137e3e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; public class StreamNode implements Serializable { private static final long serialVersionUID = 1L; + private static int currentSlotSharingIndex = 1; transient private StreamExecutionEnvironment env; @@ -45,6 +46,8 @@ public class StreamNode implements Serializable { private Integer parallelism = null; private Long bufferTimeout = null; private String operatorName; + private Integer slotSharingID; + private boolean isolatedSlot = false; private transient StreamOperator<?, ?> operator; private List<OutputSelector<?>> outputSelectors; @@ -68,6 +71,7 @@ public class StreamNode implements Serializable { this.operator = operator; this.outputSelectors = outputSelector; this.jobVertexClass = jobVertexClass; + this.slotSharingID = currentSlotSharingIndex; } public void addInEdge(StreamEdge inEdge) { @@ -198,4 +202,16 @@ public class StreamNode implements Serializable { this.inputFormat = inputFormat; } + public int getSlotSharingID() { + return isolatedSlot ? -1 : slotSharingID; + } + + public void startNewSlotSharingGroup() { + this.slotSharingID = ++currentSlotSharingIndex; + } + + public void isolateSlot() { + isolatedSlot = true; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index a33c118..1016fa0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.lang.StringUtils; import org.apache.flink.configuration.Configuration; @@ -140,7 +141,7 @@ public class StreamingJobGraphGenerator { List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); - for (StreamEdge outEdge : streamGraph.getVertex(current).getOutEdges()) { + for (StreamEdge outEdge : streamGraph.getStreamNode(current).getOutEdges()) { if (isChainable(outEdge)) { chainableOutputs.add(outEdge); } else { @@ -168,7 +169,7 @@ public class StreamingJobGraphGenerator { config.setChainStart(); config.setOutEdgesInOrder(transitiveOutEdges); - config.setOutEdges(streamGraph.getVertex(current).getOutEdges()); + config.setOutEdges(streamGraph.getStreamNode(current).getOutEdges()); for (StreamEdge edge : transitiveOutEdges) { connect(startNode, edge); @@ -194,7 +195,7 @@ public class StreamingJobGraphGenerator { } private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) { - String operatorName = streamGraph.getVertex(vertexID).getOperatorName(); + String operatorName = streamGraph.getStreamNode(vertexID).getOperatorName(); if (chainedOutputs.size() > 1) { List<String> outputChainedNames = new ArrayList<String>(); for (StreamEdge chainable : chainedOutputs) { @@ -216,7 +217,7 @@ public class StreamingJobGraphGenerator { private StreamConfig createProcessingVertex(Integer vertexID) { AbstractJobVertex jobVertex = new AbstractJobVertex(chainedNames.get(vertexID)); - StreamNode vertex = streamGraph.getVertex(vertexID); + StreamNode vertex = streamGraph.getStreamNode(vertexID); jobVertex.setInvokableClass(vertex.getJobVertexClass()); @@ -246,7 +247,7 @@ public class StreamingJobGraphGenerator { private void setVertexConfig(Integer vertexID, StreamConfig config, List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) { - StreamNode vertex = streamGraph.getVertex(vertexID); + StreamNode vertex = streamGraph.getStreamNode(vertexID); config.setVertexID(vertexID); config.setBufferTimeout(vertex.getBufferTimeout()); @@ -317,6 +318,8 @@ public class StreamingJobGraphGenerator { return downStreamVertex.getInEdges().size() == 1 && outOperator != null + && upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID() + && upStreamVertex.getSlotSharingID() != -1 && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator .getChainingStrategy() == ChainingStrategy.ALWAYS) @@ -327,10 +330,21 @@ public class StreamingJobGraphGenerator { } private void setSlotSharing() { - SlotSharingGroup shareGroup = new SlotSharingGroup(); - for (AbstractJobVertex vertex : jobVertices.values()) { - vertex.setSlotSharingGroup(shareGroup); + Map<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<Integer, SlotSharingGroup>(); + + for (Entry<Integer, AbstractJobVertex> entry : jobVertices.entrySet()) { + + int slotSharingID = streamGraph.getStreamNode(entry.getKey()).getSlotSharingID(); + + if (slotSharingID != -1) { + SlotSharingGroup group = slotSharingGroups.get(slotSharingID); + if (group == null) { + group = new SlotSharingGroup(); + slotSharingGroups.put(slotSharingID, group); + } + entry.getValue().setSlotSharingGroup(group); + } } for (StreamLoop loop : streamGraph.getStreamLoops()) { http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java index b688ea4..dfcdc8d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java @@ -53,7 +53,7 @@ public class WindowingOptimizer { for (Integer flattenerID : flatteners) { // Flatteners should have exactly one input - StreamNode input = streamGraph.getVertex(flattenerID).getInEdges().get(0) + StreamNode input = streamGraph.getStreamNode(flattenerID).getInEdges().get(0) .getSourceVertex(); // Check whether the flatten is applied after a merge @@ -98,9 +98,9 @@ public class WindowingOptimizer { for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : discretizers) { boolean inMatching = false; for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) { - Set<Integer> discretizerInEdges = new HashSet<Integer>(streamGraph.getVertex( + Set<Integer> discretizerInEdges = new HashSet<Integer>(streamGraph.getStreamNode( discretizer.f0).getInEdgeIndices()); - Set<Integer> matchingInEdges = new HashSet<Integer>(streamGraph.getVertex( + Set<Integer> matchingInEdges = new HashSet<Integer>(streamGraph.getStreamNode( matching.f1.get(0)).getInEdgeIndices()); if (discretizer.f1.equals(matching.f0) @@ -132,7 +132,7 @@ public class WindowingOptimizer { private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID, Integer replaceWithID) { // Convert to array to create a copy - List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph.getVertex(toReplaceID) + List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph.getStreamNode(toReplaceID) .getOutEdges()); int numOutputs = outEdges.size(); @@ -146,6 +146,6 @@ public class WindowingOptimizer { } // Remove the other discretizer - streamGraph.removeVertex(streamGraph.getVertex(toReplaceID)); + streamGraph.removeVertex(streamGraph.getStreamNode(toReplaceID)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 5cb3ec9..a361b7a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -113,7 +113,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable { // Task already cancelled do nothing return null; } - } catch (IllegalStateException e) { + } catch (IllegalStateException e) { if (isRunning) { throw new RuntimeException("Could not read next record due to: " + StringUtils.stringifyException(e)); @@ -198,6 +198,16 @@ public abstract class StreamOperator<IN, OUT> implements Serializable { return chainingStrategy; } + /** + * Defines the chaining scheme for the operator. By default <b>ALWAYS</b> is used, + * which means operators will be eagerly chained whenever possible, for + * maximal performance. It is generally a good practice to allow maximal + * chaining and increase operator parallelism. </p> When the strategy is set + * to <b>NEVER</b>, the operator will not be chained to the preceding or succeeding + * operators.</p> <b>HEAD</b> strategy marks a start of a new chain, so that the + * operator will not be chained to preceding operators, only succeding ones. + * + */ public static enum ChainingStrategy { ALWAYS, NEVER, HEAD } http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java index 4228314..4b3543b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java @@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; -import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.util.TestListResultSink; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.Collector; @@ -89,7 +88,7 @@ public class CoStreamTest { public boolean filter(Tuple2<Integer, Integer> value) throws Exception { return true; } - }).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER).groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() { + }).disableChaining().groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java new file mode 100644 index 0000000..94636f7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.Test; + +public class SlotAllocationTest { + + @SuppressWarnings("serial") + @Test + public void test() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8); + + FilterFunction<Long> dummyFilter = new FilterFunction<Long>() { + + @Override + public boolean filter(Long value) throws Exception { + + return false; + } + }; + + env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter) + .disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter) + .startNewChain().print(); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + List<AbstractJobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + + assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); + assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1) + .getSlotSharingGroup()); + assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3) + .getSlotSharingGroup()); + assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup()); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java index ca0c1dc..5986a30 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java @@ -33,7 +33,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.util.TestListResultSink; import org.apache.flink.streaming.util.TestStreamEnvironment; @@ -132,7 +131,7 @@ public class SelfConnectionTest implements Serializable { public String map(Integer value) throws Exception { return "x " + value; } - }).setChainingStrategy(StreamOperator.ChainingStrategy.ALWAYS); + }); stringMap.connect(src).map(new CoMapFunction<String, Integer, String>() { @@ -178,7 +177,7 @@ public class SelfConnectionTest implements Serializable { StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE); - DataStream<Integer> src = env.fromElements(1, 3, 5).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER); + DataStream<Integer> src = env.fromElements(1, 3, 5).disableChaining(); DataStream<String> stringMap = src.flatMap(new FlatMapFunction<Integer, String>() { http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 41be9d4..ee2afe6 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -95,9 +95,67 @@ class DataStream[T](javaStream: JavaStream[T]) { "parallelism.") } - def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = { + /** + * Turns off chaining for this operator so thread co-location will not be + * used as an optimization. </p> Chaining can be turned off for the whole + * job by {@link StreamExecutionEnvironment#disableOperatorChaning()} + * however it is not advised for performance considerations. + * + */ + def disableChaining(): DataStream[T] = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining(); + case _ => + throw new UnsupportedOperationException("Only supported for operators.") + } + this + } + + /** + * Starts a new task chain beginning at this operator. This operator will + * not be chained (thread co-located for increased performance) to any + * previous tasks even if possible. + * + */ + def startNewChain(): DataStream[T] = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain(); + case _ => + throw new UnsupportedOperationException("Only supported for operators.") + } + this + } + + /** + * Isolates the operator in its own resource group. This will cause the + * operator to grab as many task slots as its degree of parallelism. If + * there are no free resources available, the job will fail to start. + * All subsequent operators are assigned to the default resource group. + * + */ + def isolateResources(): DataStream[T] = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources(); + case _ => + throw new UnsupportedOperationException("Only supported for operators.") + } + this + } + + /** + * By default all operators in a streaming job share the same resource + * group. Each resource group takes as many task manager slots as the + * maximum parallelism operator in that group. By calling this method, this + * operators starts a new resource group and all subsequent operators will + * be added to this group unless specified otherwise. Please note that + * local executions have by default as many available task slots as the + * environment parallelism, so in order to start a new resource group the + * degree of parallelism for the operators must be decreased from the + * default. + */ + def startNewResourceGroup(): DataStream[T] = { javaStream match { - case ds: SingleOutputStreamOperator[_, _] => ds.setChainingStrategy(strategy) + case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup(); case _ => throw new UnsupportedOperationException("Only supported for operators.") } http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala index d14787c..699c38d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -100,7 +100,7 @@ object StreamCrossOperator { override def every(length: Long): CrossWindow[I1, I2] = { val graph = javaStream.getExecutionEnvironment().getStreamGraph() - val operator = graph.getVertex(javaStream.getId()).getOperator() + val operator = graph.getStreamNode(javaStream.getId()).getOperator() operator.asInstanceOf[CoStreamWindow[_,_,_]].setSlideSize(length) this } http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index c7716ed..15a8f1d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -122,6 +122,17 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { javaEnv.enableCheckpointing() this } + + /** + * Disables operator chaining for streaming operators. Operator chaining + * allows non-shuffle operations to be co-located in the same thread fully + * avoiding serialization and de-serialization. + * + */ + def disableOperatorChaning(): StreamExecutionEnvironment = { + javaEnv.disableOperatorChaning() + this + } /** * Sets the number of times that failed tasks are re-executed. A value of zero