Repository: flink
Updated Branches:
  refs/heads/master 7f0ce1428 -> ca82b0cc3


[FLINK-1948] [streaming] Manual task slot sharing settings added for stream 
operators

Closes #634


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca82b0cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca82b0cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca82b0cc

Branch: refs/heads/master
Commit: ca82b0cc36added997cc60aa246a53b76970a355
Parents: 7f0ce14
Author: Gyula Fora <gyf...@apache.org>
Authored: Tue Apr 28 23:39:32 2015 +0200
Committer: mbalassi <mbala...@apache.org>
Committed: Wed Apr 29 14:05:55 2015 +0200

----------------------------------------------------------------------
 .../datastream/SingleOutputStreamOperator.java  | 73 ++++++++++++++++++--
 .../temporal/StreamCrossOperator.java           |  2 +-
 .../environment/StreamExecutionEnvironment.java | 12 ++++
 .../streaming/api/graph/JSONGenerator.java      | 12 ++--
 .../flink/streaming/api/graph/StreamGraph.java  | 73 +++++++++++++-------
 .../flink/streaming/api/graph/StreamNode.java   | 16 +++++
 .../api/graph/StreamingJobGraphGenerator.java   | 30 +++++---
 .../streaming/api/graph/WindowingOptimizer.java | 10 +--
 .../streaming/api/operators/StreamOperator.java | 12 +++-
 .../flink/streaming/api/CoStreamTest.java       |  3 +-
 .../streaming/api/graph/SlotAllocationTest.java | 62 +++++++++++++++++
 .../api/operators/co/SelfConnectionTest.java    |  5 +-
 .../flink/streaming/api/scala/DataStream.scala  | 62 ++++++++++++++++-
 .../api/scala/StreamCrossOperator.scala         |  2 +-
 .../api/scala/StreamExecutionEnvironment.scala  | 11 +++
 15 files changed, 327 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index aa70e3f..8141b75 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import 
org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
 
@@ -57,8 +58,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
        }
 
        /**
-        * Sets the parallelism for this operator. The degree must be 1 or
-        * more.
+        * Sets the parallelism for this operator. The degree must be 1 or more.
         * 
         * @param parallelism
         *            The parallelism for this operator.
@@ -118,11 +118,43 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                return new SingleOutputStreamOperator<OUT, O>(this);
        }
 
-       public SingleOutputStreamOperator<OUT, O> 
setChainingStrategy(ChainingStrategy strategy) {
+       /**
+        * Sets the {@link ChainingStrategy} for the given operator affecting 
the
+        * way operators will possibly be co-located on the same thread for
+        * increased performance.
+        * 
+        * @param strategy
+        *            The selected {@link ChainingStrategy}
+        * @return The operator with the modified chaining strategy
+        */
+       private SingleOutputStreamOperator<OUT, O> 
setChainingStrategy(ChainingStrategy strategy) {
                this.operator.setChainingStrategy(strategy);
                return this;
        }
-       
+
+       /**
+        * Turns off chaining for this operator so thread co-location will not 
be
+        * used as an optimization. </p> Chaining can be turned off for the 
whole
+        * job by {@link StreamExecutionEnvironment#disableOperatorChaning()}
+        * however it is not advised for performance considerations.
+        * 
+        * @return The operator with chaining disabled
+        */
+       public SingleOutputStreamOperator<OUT, O> disableChaining() {
+               return setChainingStrategy(ChainingStrategy.NEVER);
+       }
+
+       /**
+        * Starts a new task chain beginning at this operator. This operator 
will
+        * not be chained (thread co-located for increased performance) to any
+        * previous tasks even if possible.
+        * 
+        * @return The operator with chaining set.
+        */
+       public SingleOutputStreamOperator<OUT, O> startNewChain() {
+               return setChainingStrategy(ChainingStrategy.HEAD);
+       }
+
        /**
         * Adds a type information hint about the return type of this operator. 
         * 
@@ -237,4 +269,37 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                }
        }
 
+       /**
+        * By default all operators in a streaming job share the same resource
+        * group. Each resource group takes as many task manager slots as the
+        * maximum parallelism operator in that group. Task chaining is only
+        * possible within one resource group. By calling this method, this
+        * operators starts a new resource group and all subsequent operators 
will
+        * be added to this group unless specified otherwise. </p> Please note 
that
+        * local executions have by default as many available task slots as the
+        * environment parallelism, so in order to start a new resource group 
the
+        * degree of parallelism for the operators must be decreased from the
+        * default.
+        * 
+        * @return The operator as a part of a new resource group.
+        */
+       public SingleOutputStreamOperator<OUT, O> startNewResourceGroup() {
+               streamGraph.setResourceStrategy(getId(), 
ResourceStrategy.NEWGROUP);
+               return this;
+       }
+
+       /**
+        * Isolates the operator in its own resource group. This will cause the
+        * operator to grab as many task slots as its degree of parallelism. If
+        * there are no free resources available, the job will fail to start. It
+        * also disables chaining for this operator </p>All subsequent 
operators are
+        * assigned to the default resource group.
+        * 
+        * @return The operator with isolated resource group.
+        */
+       public SingleOutputStreamOperator<OUT, O> isolateResources() {
+               streamGraph.setResourceStrategy(getId(), 
ResourceStrategy.ISOLATE);
+               return this;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
index dbd295f..3dd02a3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
@@ -76,7 +76,7 @@ public class StreamCrossOperator<I1, I2> extends
 
                @SuppressWarnings("unchecked")
                public CrossWindow<I1, I2> every(long length) {
-                       ((CoStreamWindow<I1, I2, ?>) 
streamGraph.getVertex(id).getOperator())
+                       ((CoStreamWindow<I1, I2, ?>) 
streamGraph.getStreamNode(id).getOperator())
                                        .setSlideSize(length);
                        return this;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3e935f5..b0471f9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -190,6 +190,18 @@ public abstract class StreamExecutionEnvironment {
        }
 
        /**
+        * Disables operator chaining for streaming operators. Operator chaining
+        * allows non-shuffle operations to be co-located in the same thread 
fully
+        * avoiding serialization and de-serialization.
+        * 
+        * @return StreamExecutionEnvironment with chaining disabled.
+        */
+       public StreamExecutionEnvironment disableOperatorChaning() {
+               streamGraph.setChaining(false);
+               return this;
+       }
+
+       /**
         * Method for enabling fault-tolerance. Activates monitoring and backup 
of
         * streaming operator states.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 8d8ded9..a0e0a36 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -60,7 +60,7 @@ public class JSONGenerator {
                        Map<Integer, Integer> edgeRemapings) throws 
JSONException {
 
                Integer vertexID = toVisit.get(0);
-               StreamNode vertex = streamGraph.getVertex(vertexID);
+               StreamNode vertex = streamGraph.getStreamNode(vertexID);
 
                if (streamGraph.getSourceIDs().contains(vertexID)
                                || Collections.disjoint(vertex.getInEdges(), 
toVisit)) {
@@ -97,7 +97,7 @@ public class JSONGenerator {
                        obj.put(STEPS, iterationSteps);
                        obj.put(ID, iterationHead);
                        obj.put(PACT, "IterativeDataStream");
-                       obj.put(PARALLELISM, 
streamGraph.getVertex(iterationHead).getParallelism());
+                       obj.put(PARALLELISM, 
streamGraph.getStreamNode(iterationHead).getParallelism());
                        obj.put(CONTENTS, "Stream Iteration");
                        JSONArray iterationInputs = new JSONArray();
                        obj.put(PREDECESSORS, iterationInputs);
@@ -115,7 +115,7 @@ public class JSONGenerator {
                        Map<Integer, Integer> edgeRemapings, JSONArray 
iterationInEdges) throws JSONException {
 
                Integer vertexID = toVisit.get(0);
-               StreamNode vertex = streamGraph.getVertex(vertexID);
+               StreamNode vertex = streamGraph.getStreamNode(vertexID);
                toVisit.remove(vertexID);
 
                // Ignoring head and tail to avoid redundancy
@@ -154,7 +154,7 @@ public class JSONGenerator {
 
        private void decorateNode(Integer vertexID, JSONObject node) throws 
JSONException {
 
-               StreamNode vertex = streamGraph.getVertex(vertexID);
+               StreamNode vertex = streamGraph.getStreamNode(vertexID);
 
                node.put(ID, vertexID);
                node.put(TYPE, vertex.getOperatorName());
@@ -165,7 +165,7 @@ public class JSONGenerator {
                        node.put(PACT, "Data Stream");
                }
 
-               StreamOperator<?, ?> operator = 
streamGraph.getVertex(vertexID).getOperator();
+               StreamOperator<?, ?> operator = 
streamGraph.getStreamNode(vertexID).getOperator();
 
                if (operator != null && operator.getUserFunction() != null) {
                        node.put(CONTENTS, vertex.getOperatorName() + " at "
@@ -174,7 +174,7 @@ public class JSONGenerator {
                        node.put(CONTENTS, vertex.getOperatorName());
                }
 
-               node.put(PARALLELISM, 
streamGraph.getVertex(vertexID).getParallelism());
+               node.put(PARALLELISM, 
streamGraph.getStreamNode(vertexID).getParallelism());
        }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index bfeed28..93bf8eb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -173,15 +173,15 @@ public class StreamGraph extends StreamingPlan {
 
                chaining = false;
 
-               StreamLoop iteration = new StreamLoop(iterationID, 
getVertex(iterationHead), timeOut);
+               StreamLoop iteration = new StreamLoop(iterationID, 
getStreamNode(iterationHead), timeOut);
                streamLoops.put(iterationID, iteration);
                vertexIDtoLoop.put(vertexID, iteration);
 
                setSerializersFrom(iterationHead, vertexID);
-               getVertex(vertexID).setOperatorName("IterationHead-" + 
iterationHead);
+               getStreamNode(vertexID).setOperatorName("IterationHead-" + 
iterationHead);
 
-               int outpartitionerIndex = 
getVertex(iterationHead).getInEdgeIndices().get(0);
-               StreamPartitioner<?> outputPartitioner = 
getVertex(outpartitionerIndex).getOutEdges()
+               int outpartitionerIndex = 
getStreamNode(iterationHead).getInEdgeIndices().get(0);
+               StreamPartitioner<?> outputPartitioner = 
getStreamNode(outpartitionerIndex).getOutEdges()
                                .get(0).getPartitioner();
 
                addEdge(vertexID, iterationHead, outputPartitioner, 0, new 
ArrayList<String>());
@@ -196,22 +196,23 @@ public class StreamGraph extends StreamingPlan {
        public void addIterationTail(Integer vertexID, Integer iterationTail, 
Integer iterationID,
                        long waitTime) {
 
-               if (getVertex(iterationTail).getBufferTimeout() == 0) {
+               if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
                        throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
                }
 
                addNode(vertexID, StreamIterationTail.class, null, 
null).setParallelism(
-                               getVertex(iterationTail).getParallelism());
+                               getStreamNode(iterationTail).getParallelism());
 
                StreamLoop iteration = streamLoops.get(iterationID);
-               iteration.setTail(getVertex(iterationTail));
+               iteration.setTail(getStreamNode(iterationTail));
                vertexIDtoLoop.put(vertexID, iteration);
 
                setSerializersFrom(iterationTail, vertexID);
-               getVertex(vertexID).setOperatorName("IterationTail-" + 
iterationTail);
+               getStreamNode(vertexID).setOperatorName("IterationTail-" + 
iterationTail);
 
-               setParallelism(iteration.getHead().getID(), 
getVertex(iterationTail).getParallelism());
-               setBufferTimeout(iteration.getHead().getID(), 
getVertex(iterationTail).getBufferTimeout());
+               setParallelism(iteration.getHead().getID(), 
getStreamNode(iterationTail).getParallelism());
+               setBufferTimeout(iteration.getHead().getID(), 
getStreamNode(iterationTail)
+                               .getBufferTimeout());
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("ITERATION SINK: {}", vertexID);
@@ -233,14 +234,14 @@ public class StreamGraph extends StreamingPlan {
        public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID,
                        StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames) {
 
-               StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID),
-                               getVertex(downStreamVertexID), typeNumber, 
outputNames, partitionerObject);
-               getVertex(edge.getSourceID()).addOutEdge(edge);
-               getVertex(edge.getTargetID()).addInEdge(edge);
+               StreamEdge edge = new 
StreamEdge(getStreamNode(upStreamVertexID),
+                               getStreamNode(downStreamVertexID), typeNumber, 
outputNames, partitionerObject);
+               getStreamNode(edge.getSourceID()).addOutEdge(edge);
+               getStreamNode(edge.getTargetID()).addInEdge(edge);
        }
 
        public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> 
outputSelector) {
-               getVertex(vertexID).addOutputSelector(outputSelector);
+               getStreamNode(vertexID).addOutputSelector(outputSelector);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Outputselector set for {}", vertexID);
@@ -249,24 +250,24 @@ public class StreamGraph extends StreamingPlan {
        }
 
        public void setParallelism(Integer vertexID, int parallelism) {
-               getVertex(vertexID).setParallelism(parallelism);
+               getStreamNode(vertexID).setParallelism(parallelism);
        }
 
        public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
-               getVertex(vertexID).setBufferTimeout(bufferTimeout);
+               getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
        }
 
        private void setSerializers(Integer vertexID, StreamRecordSerializer<?> 
in1,
                        StreamRecordSerializer<?> in2, 
StreamRecordSerializer<?> out) {
-               StreamNode vertex = getVertex(vertexID);
+               StreamNode vertex = getStreamNode(vertexID);
                vertex.setSerializerIn1(in1);
                vertex.setSerializerIn2(in2);
                vertex.setSerializerOut(out);
        }
 
        private void setSerializersFrom(Integer from, Integer to) {
-               StreamNode fromVertex = getVertex(from);
-               StreamNode toVertex = getVertex(to);
+               StreamNode fromVertex = getStreamNode(from);
+               StreamNode toVertex = getStreamNode(to);
 
                toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
                toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
@@ -275,18 +276,32 @@ public class StreamGraph extends StreamingPlan {
        public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> 
outType) {
                StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType,
                                executionConfig);
-               getVertex(vertexID).setSerializerOut(serializer);
+               getStreamNode(vertexID).setSerializerOut(serializer);
        }
 
        public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<IN, 
OUT> operatorObject) {
-               getVertex(vertexID).setOperator(operatorObject);
+               getStreamNode(vertexID).setOperator(operatorObject);
        }
 
        public void setInputFormat(Integer vertexID, InputFormat<String, ?> 
inputFormat) {
-               getVertex(vertexID).setInputFormat(inputFormat);
+               getStreamNode(vertexID).setInputFormat(inputFormat);
+       }
+
+       public void setResourceStrategy(Integer vertexID, ResourceStrategy 
strategy) {
+               StreamNode node = getStreamNode(vertexID);
+               switch (strategy) {
+               case ISOLATE:
+                       node.isolateSlot();
+                       break;
+               case NEWGROUP:
+                       node.startNewSlotSharingGroup();
+                       break;
+               default:
+                       throw new IllegalArgumentException("Unknown resource 
strategy");
+               }
        }
 
-       public StreamNode getVertex(Integer vertexID) {
+       public StreamNode getStreamNode(Integer vertexID) {
                return streamNodes.get(vertexID);
        }
 
@@ -295,7 +310,7 @@ public class StreamGraph extends StreamingPlan {
        }
 
        protected StreamEdge getEdge(int sourceId, int targetId) {
-               Iterator<StreamEdge> outIterator = 
getVertex(sourceId).getOutEdges().iterator();
+               Iterator<StreamEdge> outIterator = 
getStreamNode(sourceId).getOutEdges().iterator();
                while (outIterator.hasNext()) {
                        StreamEdge edge = outIterator.next();
 
@@ -311,6 +326,10 @@ public class StreamGraph extends StreamingPlan {
                return sources;
        }
 
+       public Collection<StreamNode> getStreamNodes() {
+               return streamNodes.values();
+       }
+
        public Set<Tuple2<Integer, StreamOperator<?, ?>>> getOperators() {
                Set<Tuple2<Integer, StreamOperator<?, ?>>> operatorSet = new 
HashSet<Tuple2<Integer, StreamOperator<?, ?>>>();
                for (StreamNode vertex : streamNodes.values()) {
@@ -414,6 +433,10 @@ public class StreamGraph extends StreamingPlan {
                }
        }
 
+       public static enum ResourceStrategy {
+               DEFAULT, ISOLATE, NEWGROUP
+       }
+
        /**
         * Object for representing loops in streaming programs.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 9672035..7137e3e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 public class StreamNode implements Serializable {
 
        private static final long serialVersionUID = 1L;
+       private static int currentSlotSharingIndex = 1;
 
        transient private StreamExecutionEnvironment env;
 
@@ -45,6 +46,8 @@ public class StreamNode implements Serializable {
        private Integer parallelism = null;
        private Long bufferTimeout = null;
        private String operatorName;
+       private Integer slotSharingID;
+       private boolean isolatedSlot = false;
 
        private transient StreamOperator<?, ?> operator;
        private List<OutputSelector<?>> outputSelectors;
@@ -68,6 +71,7 @@ public class StreamNode implements Serializable {
                this.operator = operator;
                this.outputSelectors = outputSelector;
                this.jobVertexClass = jobVertexClass;
+               this.slotSharingID = currentSlotSharingIndex;
        }
 
        public void addInEdge(StreamEdge inEdge) {
@@ -198,4 +202,16 @@ public class StreamNode implements Serializable {
                this.inputFormat = inputFormat;
        }
 
+       public int getSlotSharingID() {
+               return isolatedSlot ? -1 : slotSharingID;
+       }
+
+       public void startNewSlotSharingGroup() {
+               this.slotSharingID = ++currentSlotSharingIndex;
+       }
+
+       public void isolateSlot() {
+               isolatedSlot = true;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a33c118..1016fa0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.flink.configuration.Configuration;
@@ -140,7 +141,7 @@ public class StreamingJobGraphGenerator {
                        List<StreamEdge> chainableOutputs = new 
ArrayList<StreamEdge>();
                        List<StreamEdge> nonChainableOutputs = new 
ArrayList<StreamEdge>();
 
-                       for (StreamEdge outEdge : 
streamGraph.getVertex(current).getOutEdges()) {
+                       for (StreamEdge outEdge : 
streamGraph.getStreamNode(current).getOutEdges()) {
                                if (isChainable(outEdge)) {
                                        chainableOutputs.add(outEdge);
                                } else {
@@ -168,7 +169,7 @@ public class StreamingJobGraphGenerator {
 
                                config.setChainStart();
                                config.setOutEdgesInOrder(transitiveOutEdges);
-                               
config.setOutEdges(streamGraph.getVertex(current).getOutEdges());
+                               
config.setOutEdges(streamGraph.getStreamNode(current).getOutEdges());
 
                                for (StreamEdge edge : transitiveOutEdges) {
                                        connect(startNode, edge);
@@ -194,7 +195,7 @@ public class StreamingJobGraphGenerator {
        }
 
        private String createChainedName(Integer vertexID, List<StreamEdge> 
chainedOutputs) {
-               String operatorName = 
streamGraph.getVertex(vertexID).getOperatorName();
+               String operatorName = 
streamGraph.getStreamNode(vertexID).getOperatorName();
                if (chainedOutputs.size() > 1) {
                        List<String> outputChainedNames = new 
ArrayList<String>();
                        for (StreamEdge chainable : chainedOutputs) {
@@ -216,7 +217,7 @@ public class StreamingJobGraphGenerator {
        private StreamConfig createProcessingVertex(Integer vertexID) {
 
                AbstractJobVertex jobVertex = new 
AbstractJobVertex(chainedNames.get(vertexID));
-               StreamNode vertex = streamGraph.getVertex(vertexID);
+               StreamNode vertex = streamGraph.getStreamNode(vertexID);
 
                jobVertex.setInvokableClass(vertex.getJobVertexClass());
 
@@ -246,7 +247,7 @@ public class StreamingJobGraphGenerator {
        private void setVertexConfig(Integer vertexID, StreamConfig config,
                        List<StreamEdge> chainableOutputs, List<StreamEdge> 
nonChainableOutputs) {
 
-               StreamNode vertex = streamGraph.getVertex(vertexID);
+               StreamNode vertex = streamGraph.getStreamNode(vertexID);
 
                config.setVertexID(vertexID);
                config.setBufferTimeout(vertex.getBufferTimeout());
@@ -317,6 +318,8 @@ public class StreamingJobGraphGenerator {
 
                return downStreamVertex.getInEdges().size() == 1
                                && outOperator != null
+                               && upStreamVertex.getSlotSharingID() == 
downStreamVertex.getSlotSharingID()
+                               && upStreamVertex.getSlotSharingID() != -1
                                && outOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS
                                && (headOperator.getChainingStrategy() == 
ChainingStrategy.HEAD || headOperator
                                                .getChainingStrategy() == 
ChainingStrategy.ALWAYS)
@@ -327,10 +330,21 @@ public class StreamingJobGraphGenerator {
        }
 
        private void setSlotSharing() {
-               SlotSharingGroup shareGroup = new SlotSharingGroup();
 
-               for (AbstractJobVertex vertex : jobVertices.values()) {
-                       vertex.setSlotSharingGroup(shareGroup);
+               Map<Integer, SlotSharingGroup> slotSharingGroups = new 
HashMap<Integer, SlotSharingGroup>();
+
+               for (Entry<Integer, AbstractJobVertex> entry : 
jobVertices.entrySet()) {
+
+                       int slotSharingID = 
streamGraph.getStreamNode(entry.getKey()).getSlotSharingID();
+
+                       if (slotSharingID != -1) {
+                               SlotSharingGroup group = 
slotSharingGroups.get(slotSharingID);
+                               if (group == null) {
+                                       group = new SlotSharingGroup();
+                                       slotSharingGroups.put(slotSharingID, 
group);
+                               }
+                               entry.getValue().setSlotSharingGroup(group);
+                       }
                }
 
                for (StreamLoop loop : streamGraph.getStreamLoops()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
index b688ea4..dfcdc8d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -53,7 +53,7 @@ public class WindowingOptimizer {
 
                for (Integer flattenerID : flatteners) {
                        // Flatteners should have exactly one input
-                       StreamNode input = 
streamGraph.getVertex(flattenerID).getInEdges().get(0)
+                       StreamNode input = 
streamGraph.getStreamNode(flattenerID).getInEdges().get(0)
                                        .getSourceVertex();
 
                        // Check whether the flatten is applied after a merge
@@ -98,9 +98,9 @@ public class WindowingOptimizer {
                for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : 
discretizers) {
                        boolean inMatching = false;
                        for (Tuple2<StreamDiscretizer<?>, List<Integer>> 
matching : matchingDiscretizers) {
-                               Set<Integer> discretizerInEdges = new 
HashSet<Integer>(streamGraph.getVertex(
+                               Set<Integer> discretizerInEdges = new 
HashSet<Integer>(streamGraph.getStreamNode(
                                                
discretizer.f0).getInEdgeIndices());
-                               Set<Integer> matchingInEdges = new 
HashSet<Integer>(streamGraph.getVertex(
+                               Set<Integer> matchingInEdges = new 
HashSet<Integer>(streamGraph.getStreamNode(
                                                
matching.f1.get(0)).getInEdgeIndices());
 
                                if (discretizer.f1.equals(matching.f0)
@@ -132,7 +132,7 @@ public class WindowingOptimizer {
        private static void replaceDiscretizer(StreamGraph streamGraph, Integer 
toReplaceID,
                        Integer replaceWithID) {
                // Convert to array to create a copy
-               List<StreamEdge> outEdges = new 
ArrayList<StreamEdge>(streamGraph.getVertex(toReplaceID)
+               List<StreamEdge> outEdges = new 
ArrayList<StreamEdge>(streamGraph.getStreamNode(toReplaceID)
                                .getOutEdges());
 
                int numOutputs = outEdges.size();
@@ -146,6 +146,6 @@ public class WindowingOptimizer {
                }
 
                // Remove the other discretizer
-               streamGraph.removeVertex(streamGraph.getVertex(toReplaceID));
+               
streamGraph.removeVertex(streamGraph.getStreamNode(toReplaceID));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 5cb3ec9..a361b7a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -113,7 +113,7 @@ public abstract class StreamOperator<IN, OUT> implements 
Serializable {
                                // Task already cancelled do nothing
                                return null;
                        }
-               }  catch (IllegalStateException e) {
+               } catch (IllegalStateException e) {
                        if (isRunning) {
                                throw new RuntimeException("Could not read next 
record due to: "
                                                + 
StringUtils.stringifyException(e));
@@ -198,6 +198,16 @@ public abstract class StreamOperator<IN, OUT> implements 
Serializable {
                return chainingStrategy;
        }
 
+       /**
+        * Defines the chaining scheme for the operator. By default 
<b>ALWAYS</b> is used,
+        * which means operators will be eagerly chained whenever possible, for
+        * maximal performance. It is generally a good practice to allow maximal
+        * chaining and increase operator parallelism. </p> When the strategy 
is set
+        * to <b>NEVER</b>, the operator will not be chained to the preceding 
or succeeding
+        * operators.</p> <b>HEAD</b> strategy marks a start of a new chain, so 
that the
+        * operator will not be chained to preceding operators, only succeding 
ones.
+        * 
+        */
        public static enum ChainingStrategy {
                ALWAYS, NEVER, HEAD
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 4228314..4b3543b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
@@ -89,7 +88,7 @@ public class CoStreamTest {
                                        public boolean filter(Tuple2<Integer, 
Integer> value) throws Exception {
                                                return true;
                                        }
-                               
}).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER).groupBy(new 
KeySelector<Tuple2<Integer, Integer>, Integer>() {
+                               }).disableChaining().groupBy(new 
KeySelector<Tuple2<Integer, Integer>, Integer>() {
 
                                        private static final long 
serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
new file mode 100644
index 0000000..94636f7
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.Test;
+
+public class SlotAllocationTest {
+
+       @SuppressWarnings("serial")
+       @Test
+       public void test() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(8);
+
+               FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
+
+                       @Override
+                       public boolean filter(Long value) throws Exception {
+
+                               return false;
+                       }
+               };
+
+               env.generateSequence(1, 
10).filter(dummyFilter).isolateResources().filter(dummyFilter)
+                               
.disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
+                               .startNewChain().print();
+
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+               List<AbstractJobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+
+               assertEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(2).getSlotSharingGroup());
+               assertNotEquals(vertices.get(0).getSlotSharingGroup(), 
vertices.get(1)
+                               .getSlotSharingGroup());
+               assertNotEquals(vertices.get(2).getSlotSharingGroup(), 
vertices.get(3)
+                               .getSlotSharingGroup());
+               assertEquals(vertices.get(3).getSlotSharingGroup(), 
vertices.get(4).getSlotSharingGroup());
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index ca0c1dc..5986a30 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
@@ -132,7 +131,7 @@ public class SelfConnectionTest implements Serializable {
                        public String map(Integer value) throws Exception {
                                return "x " + value;
                        }
-               }).setChainingStrategy(StreamOperator.ChainingStrategy.ALWAYS);
+               });
 
                stringMap.connect(src).map(new CoMapFunction<String, Integer, 
String>() {
 
@@ -178,7 +177,7 @@ public class SelfConnectionTest implements Serializable {
 
                StreamExecutionEnvironment env = new TestStreamEnvironment(3, 
MEMORY_SIZE);
 
-               DataStream<Integer> src = env.fromElements(1, 3, 
5).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER);
+               DataStream<Integer> src = env.fromElements(1, 3, 
5).disableChaining();
 
                DataStream<String> stringMap = src.flatMap(new 
FlatMapFunction<Integer, String>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 41be9d4..ee2afe6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -95,9 +95,67 @@ class DataStream[T](javaStream: JavaStream[T]) {
         "parallelism.")
   }
   
-  def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = {
+  /**
+   * Turns off chaining for this operator so thread co-location will not be
+   * used as an optimization. </p> Chaining can be turned off for the whole
+   * job by {@link StreamExecutionEnvironment#disableOperatorChaning()}
+   * however it is not advised for performance considerations.
+   * 
+   */
+  def disableChaining(): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
+      case _ =>
+        throw new UnsupportedOperationException("Only supported for 
operators.")
+    }
+    this
+  }
+  
+  /**
+   * Starts a new task chain beginning at this operator. This operator will
+   * not be chained (thread co-located for increased performance) to any
+   * previous tasks even if possible.
+   * 
+   */
+  def startNewChain(): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
+      case _ =>
+        throw new UnsupportedOperationException("Only supported for 
operators.")
+    }
+    this
+  }
+  
+  /**
+   * Isolates the operator in its own resource group. This will cause the
+   * operator to grab as many task slots as its degree of parallelism. If
+   * there are no free resources available, the job will fail to start.
+   * All subsequent operators are assigned to the default resource group.
+   * 
+   */
+  def isolateResources(): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
+      case _ =>
+        throw new UnsupportedOperationException("Only supported for 
operators.")
+    }
+    this
+  }
+  
+  /**
+   * By default all operators in a streaming job share the same resource
+   * group. Each resource group takes as many task manager slots as the
+   * maximum parallelism operator in that group. By calling this method, this
+   * operators starts a new resource group and all subsequent operators will
+   * be added to this group unless specified otherwise. Please note that
+   * local executions have by default as many available task slots as the
+   * environment parallelism, so in order to start a new resource group the
+   * degree of parallelism for the operators must be decreased from the
+   * default.
+   */
+  def startNewResourceGroup(): DataStream[T] = {
     javaStream match {
-      case ds: SingleOutputStreamOperator[_, _] => 
ds.setChainingStrategy(strategy)
+      case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
       case _ =>
         throw new UnsupportedOperationException("Only supported for 
operators.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index d14787c..699c38d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -100,7 +100,7 @@ object StreamCrossOperator {
 
     override def every(length: Long): CrossWindow[I1, I2] = {
       val graph = javaStream.getExecutionEnvironment().getStreamGraph()
-      val operator = graph.getVertex(javaStream.getId()).getOperator()
+      val operator = graph.getStreamNode(javaStream.getId()).getOperator()
       operator.asInstanceOf[CoStreamWindow[_,_,_]].setSlideSize(length)
       this
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca82b0cc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index c7716ed..15a8f1d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -122,6 +122,17 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.enableCheckpointing()
     this
   }
+  
+  /**
+   * Disables operator chaining for streaming operators. Operator chaining
+   * allows non-shuffle operations to be co-located in the same thread fully
+   * avoiding serialization and de-serialization.
+   * 
+   */
+  def disableOperatorChaning(): StreamExecutionEnvironment = {
+    javaEnv.disableOperatorChaning()
+    this
+  }
 
   /**
    * Sets the number of times that failed tasks are re-executed. A value of 
zero

Reply via email to