[FLINK-5134] [runtime] Aggregate ResourceSpec for chained operators when 
generating job graph

This closes #3455


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/980d072f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/980d072f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/980d072f

Branch: refs/heads/master
Commit: 980d072fa2546dbc10cf878cf29532b2d8bbca8a
Parents: 24408e1
Author: 淘江 <taojiang....@alibaba-inc.com>
Authored: Thu Mar 2 13:51:39 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/operators/Operator.java    |  12 +-
 .../api/common/operators/ResourceSpec.java      |   2 +-
 .../flink/api/java/operators/DataSink.java      |  90 ++++----
 .../api/java/operators/DeltaIteration.java      |  88 ++++----
 .../flink/api/java/operators/Operator.java      |  96 +++++----
 .../api/java/operators/OperatorTranslation.java |  12 +-
 .../flink/api/java/operator/OperatorTest.java   |  22 +-
 .../plantranslate/JobGraphGenerator.java        |  14 +-
 .../plantranslate/JobGraphGeneratorTest.java    | 209 +++++++++++++++++++
 .../flink/runtime/jobgraph/JobVertex.java       |  38 ++++
 .../api/datastream/DataStreamSink.java          |  79 +++----
 .../datastream/SingleOutputStreamOperator.java  |  78 +++----
 .../flink/streaming/api/graph/StreamGraph.java  |  10 +-
 .../api/graph/StreamGraphGenerator.java         |  10 +-
 .../flink/streaming/api/graph/StreamNode.java   |   4 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  26 +++
 .../transformations/StreamTransformation.java   |   4 +-
 .../flink/streaming/api/DataStreamTest.java     |  68 +++---
 .../graph/StreamingJobGraphGeneratorTest.java   | 159 +++++++++++++-
 19 files changed, 739 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 1905555..6d906f2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -28,8 +28,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Visitable;
 
-import javax.annotation.Nullable;
-
 /**
 * Abstract base class for all operators. An operator is a source, sink, or it 
applies an operation to
 * one or more inputs, producing a result.
@@ -47,11 +45,9 @@ public abstract class Operator<OUT> implements 
Visitable<Operator<?>> {
                
        private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;  // the 
number of parallel instances to use
 
-       @Nullable
-       private ResourceSpec minResources;          // the minimum resource of 
the contract instance.
+       private ResourceSpec minResources = ResourceSpec.DEFAULT;          // 
the minimum resource of the contract instance.
 
-       @Nullable
-       private ResourceSpec preferredResources;    // the preferred resource 
of the contract instance.
+       private ResourceSpec preferredResources = ResourceSpec.DEFAULT;    // 
the preferred resource of the contract instance.
 
        /**
         * The return type of the user function.
@@ -199,7 +195,6 @@ public abstract class Operator<OUT> implements 
Visitable<Operator<?>> {
         *
         * @return The minimum resources of this operator.
         */
-       @Nullable
        @PublicEvolving
        public ResourceSpec getMinResources() {
                return this.minResources;
@@ -211,7 +206,6 @@ public abstract class Operator<OUT> implements 
Visitable<Operator<?>> {
         *
         * @return The preferred resource of this operator.
         */
-       @Nullable
        @PublicEvolving
        public ResourceSpec getPreferredResources() {
                return this.preferredResources;
@@ -225,7 +219,7 @@ public abstract class Operator<OUT> implements 
Visitable<Operator<?>> {
         * @param preferredResources The preferred resource of this operator.
         */
        @PublicEvolving
-       public void setResource(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+       public void setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
                this.minResources = minResources;
                this.preferredResources = preferredResources;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 0ea289a..cd3e5ad 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -44,7 +44,7 @@ public class ResourceSpec implements Serializable {
 
        private static final long serialVersionUID = 1L;
 
-       public static final ResourceSpec UNKNOWN = new ResourceSpec(0, 0, 0, 0, 
0);
+       public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 
0);
 
        /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
        private final double cpuCores;

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 369e013..fd8190c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -52,9 +52,9 @@ public class DataSink<T> {
        
        private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-       private ResourceSpec minResources = ResourceSpec.UNKNOWN;
+       private ResourceSpec minResources = ResourceSpec.DEFAULT;
 
-       private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
+       private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
 
        private Configuration parameters;
 
@@ -306,49 +306,45 @@ public class DataSink<T> {
                return this.preferredResources;
        }
 
-//     
---------------------------------------------------------------------------
-//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
-//      The setters are hence commented out at this point.
-//     
---------------------------------------------------------------------------
-//
-//     /**
-//      * Sets the minimum and preferred resources for this data sink. This 
overrides the default empty resource.
-//      *      The minimum resource must be satisfied and the preferred 
resource specifies the upper bound
-//      * for dynamic resource resize.
-//      *
-//      * @param minResources The minimum resource for this data sink.
-//      * @param preferredResources The preferred resource for this data sink.
-//      * @return The data sink with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public DataSink<T> setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
-//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
-//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
-//             Preconditions.checkArgument(minResources.isValid() && 
-//                             preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
-//                             "The values in resource must be not less than 0 
and the preferred " +
-//                             "resource must be greater than the min 
resource.");
-//
-//             this.minResources = minResources;
-//             this.preferredResources = preferredResources;
-//
-//             return this;
-//     }
-//
-//     /**
-//      * Sets the resources for this data sink. This overrides the default 
resource profile.
-//      *
-//      * @param resources The resources for this data sink.
-//      * @return The data sink with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public DataSink<T> setResources(ResourceSpec resources) {
-//             Preconditions.checkNotNull(resources, "The resources must be 
not null.");
-//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
-//
-//             this.minResources = resources;
-//             this.preferredResources = resources;
-//
-//             return this;
-//     }
+       //      
---------------------------------------------------------------------------
+       //       Fine-grained resource profiles are an incomplete 
work-in-progress feature
+       //       The setters are hence private at this point.
+       //      
---------------------------------------------------------------------------
+
+       /**
+        * Sets the minimum and preferred resources for this data sink. and the 
lower and upper resource limits
+        * will be considered in resource resize feature for future plan.
+        *
+        * @param minResources The minimum resources for this data sink.
+        * @param preferredResources The preferred resources for this data sink.
+        * @return The data sink with set minimum and preferred resources.
+        */
+       private DataSink<T> setResources(ResourceSpec minResources, 
ResourceSpec preferredResources) {
+               Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+               Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+               Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+                               "The values in resources must be not less than 
0 and the preferred resources must be greater than the min resources.");
+
+               this.minResources = minResources;
+               this.preferredResources = preferredResources;
+
+               return this;
+       }
+
+       /**
+        * Sets the resources for this data sink, and the minimum and preferred 
resources are the same by default.
+        *
+        * @param resources The resources for this data sink.
+        * @return The data sink with set minimum and preferred resources.
+        */
+       private DataSink<T> setResources(ResourceSpec resources) {
+               Preconditions.checkNotNull(resources, "The resources must be 
not null.");
+               Preconditions.checkArgument(resources.isValid(), "The values in 
resources must be not less than 0.");
+
+               this.minResources = resources;
+               this.preferredResources = resources;
+
+               return this;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index 3d327e9..61f83b1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -64,9 +64,9 @@ public class DeltaIteration<ST, WT> {
        
        private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-       private ResourceSpec minResources = ResourceSpec.UNKNOWN;
+       private ResourceSpec minResources = ResourceSpec.DEFAULT;
 
-       private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
+       private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
        
        private boolean solutionSetUnManaged;
 
@@ -197,49 +197,47 @@ public class DeltaIteration<ST, WT> {
                return parallelism;
        }
 
-//     
---------------------------------------------------------------------------
-//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
-//      The setters are hence commented out at this point.
-//     
---------------------------------------------------------------------------
-//
-//     /**
-//      * Sets the minimum and preferred resources for the iteration. This 
overrides the default empty resource.
-//      * The lower and upper resource limits will be considered in dynamic 
resource resize feature for future plan.
-//      *
-//      * @param minResources The minimum resource for the iteration.
-//      * @param preferredResources The preferred resource for the iteration.
-//      * @return The iteration with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public DeltaIteration<ST, WT> setResource(ResourceSpec minResources, 
ResourceSpec preferredResources) {
-//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
-//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
-//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
-//                             "The values in resources must be not less than 
0 and the preferred resources must be greater than the min resources.");
-//
-//             this.minResources = minResources;
-//             this.preferredResources = preferredResources;
-//
-//             return this;
-//     }
-//
-//     /**
-//      * Sets the resource for the iteration, and the minimum and preferred 
resources are the same by default.
-//      *      The lower and upper resource limits will be considered in 
dynamic resource resize feature for future plan.
-//      *
-//      * @param resources The resource for the iteration.
-//      * @return The iteration with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public DeltaIteration<ST, WT> setResource(ResourceSpec resources) {
-//             Preconditions.checkNotNull(resources, "The resources must be 
not null.");
-//             Preconditions.checkArgument(resources.isValid(), "The values in 
resource must be not less than 0.");
-//
-//             this.minResources = resources;
-//             this.preferredResources = resources;
-//
-//             return this;
-//     }
+       //      
---------------------------------------------------------------------------
+       //       Fine-grained resource profiles are an incomplete 
work-in-progress feature
+       //       The setters are hence private at this point.
+       //      
---------------------------------------------------------------------------
+
+       /**
+        * Sets the minimum and preferred resources for the iteration. This 
overrides the default resources.
+        * The lower and upper resource limits will be considered in dynamic 
resource resize feature for future plan.
+        *
+        * @param minResources The minimum resources for the iteration.
+        * @param preferredResources The preferred resources for the iteration.
+        * @return The iteration with set minimum and preferred resources.
+        */
+       private DeltaIteration<ST, WT> setResources(ResourceSpec minResources, 
ResourceSpec preferredResources) {
+               Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+               Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+               Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+                               "The values in resources must be not less than 
0 and the preferred resources must be greater than the min resources.");
+
+               this.minResources = minResources;
+               this.preferredResources = preferredResources;
+
+               return this;
+       }
+
+       /**
+        * Sets the resources for the iteration, and the minimum and preferred 
resources are the same by default.
+        *      The lower and upper resource limits will be considered in 
dynamic resource resize feature for future plan.
+        *
+        * @param resources The resources for the iteration.
+        * @return The iteration with set minimum and preferred resources.
+        */
+       private DeltaIteration<ST, WT> setResources(ResourceSpec resources) {
+               Preconditions.checkNotNull(resources, "The resources must be 
not null.");
+               Preconditions.checkArgument(resources.isValid(), "The values in 
resources must be not less than 0.");
+
+               this.minResources = resources;
+               this.preferredResources = resources;
+
+               return this;
+       }
 
        /**
         * Gets the minimum resources from this iteration. If no minimum 
resources have been set,

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 6ae59dd..e496c62 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -39,9 +39,9 @@ public abstract class Operator<OUT, O extends Operator<OUT, 
O>> extends DataSet<
        
        protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-       protected ResourceSpec minResources = ResourceSpec.UNKNOWN;
+       protected ResourceSpec minResources = ResourceSpec.DEFAULT;
 
-       protected ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
+       protected ResourceSpec preferredResources = ResourceSpec.DEFAULT;
 
 
        protected Operator(ExecutionEnvironment context, TypeInformation<OUT> 
resultType) {
@@ -130,51 +130,49 @@ public abstract class Operator<OUT, O extends 
Operator<OUT, O>> extends DataSet<
                return returnType;
        }
 
-//     
---------------------------------------------------------------------------
-//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
-//      The setters are hence commented out at this point.
-//     
---------------------------------------------------------------------------
-//
-//     /**
-//      * Sets the minimum and preferred resources for this operator. This 
overrides the default empty resource.
-//      * The lower and upper resource limits will be considered in dynamic 
resource resize feature for future plan.
-//      *
-//      * @param minResources The minimum resource for this operator.
-//      * @param preferredResources The preferred resource for this operator.
-//      * @return The operator with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public O setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
-//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
-//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
-//
-//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
-//                             "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-//
-//             this.minResources = minResources;
-//             this.preferredResources = preferredResources;
-//
-//             @SuppressWarnings("unchecked")
-//             O returnType = (O) this;
-//             return returnType;
-//     }
-//
-//     /**
-//      * Sets the resources for this operator. This overrides the default 
minimum and preferred resources.
-//      *
-//      * @param resources The resource for this operator.
-//      * @return The operator with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public O setResources(ResourceSpec resources) {
-//             Preconditions.checkNotNull(resources, "The resource must be not 
null.");
-//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
-//
-//             this.minResources = resources;
-//             this.preferredResources = resources;
-//
-//             @SuppressWarnings("unchecked")
-//             O returnType = (O) this;
-//             return returnType;
-//     }
+       //      
---------------------------------------------------------------------------
+       //       Fine-grained resource profiles are an incomplete 
work-in-progress feature
+       //       The setters are hence private at this point.
+       //      
---------------------------------------------------------------------------
+
+       /**
+        * Sets the minimum and preferred resources for this operator. This 
overrides the default resources.
+        * The lower and upper resource limits will be considered in dynamic 
resource resize feature for future plan.
+        *
+        * @param minResources The minimum resources for this operator.
+        * @param preferredResources The preferred resources for this operator.
+        * @return The operator with set minimum and preferred resources.
+        */
+       private O setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+               Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+               Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+
+               Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+                               "The values in resources must be not less than 
0 and the preferred resources must be greater than the min resources.");
+
+               this.minResources = minResources;
+               this.preferredResources = preferredResources;
+
+               @SuppressWarnings("unchecked")
+               O returnType = (O) this;
+               return returnType;
+       }
+
+       /**
+        * Sets the resources for this operator. This overrides the default 
minimum and preferred resources.
+        *
+        * @param resources The resources for this operator.
+        * @return The operator with set minimum and preferred resources.
+        */
+       private O setResources(ResourceSpec resources) {
+               Preconditions.checkNotNull(resources, "The resources must be 
not null.");
+               Preconditions.checkArgument(resources.isValid(), "The values in 
resources must be not less than 0.");
+
+               this.minResources = resources;
+               this.preferredResources = resources;
+
+               @SuppressWarnings("unchecked")
+               O returnType = (O) this;
+               return returnType;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 3bffd8b..cdbec71 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -64,7 +64,7 @@ public class OperatorTranslation {
                // translate the sink itself and connect it to the input
                GenericDataSinkBase<T> translatedSink = 
sink.translateToDataFlow(input);
 
-               translatedSink.setResource(sink.getMinResources(), 
sink.getPreferredResources());
+               translatedSink.setResources(sink.getMinResources(), 
sink.getPreferredResources());
 
                return translatedSink;
        }
@@ -95,28 +95,28 @@ public class OperatorTranslation {
                if (dataSet instanceof DataSource) {
                        DataSource<T> dataSource = (DataSource<T>) dataSet;
                        dataFlowOp = dataSource.translateToDataFlow();
-                       dataFlowOp.setResource(dataSource.getMinResources(), 
dataSource.getPreferredResources());
+                       dataFlowOp.setResources(dataSource.getMinResources(), 
dataSource.getPreferredResources());
                }
                else if (dataSet instanceof SingleInputOperator) {
                        SingleInputOperator<?, ?, ?> singleInputOperator = 
(SingleInputOperator<?, ?, ?>) dataSet;
                        dataFlowOp = 
translateSingleInputOperator(singleInputOperator);
-                       
dataFlowOp.setResource(singleInputOperator.getMinResources(), 
singleInputOperator.getPreferredResources());
+                       
dataFlowOp.setResources(singleInputOperator.getMinResources(), 
singleInputOperator.getPreferredResources());
                }
                else if (dataSet instanceof TwoInputOperator) {
                        TwoInputOperator<?, ?, ?, ?> twoInputOperator = 
(TwoInputOperator<?, ?, ?, ?>) dataSet;
                        dataFlowOp = 
translateTwoInputOperator(twoInputOperator);
-                       
dataFlowOp.setResource(twoInputOperator.getMinResources(), 
twoInputOperator.getPreferredResources());
+                       
dataFlowOp.setResources(twoInputOperator.getMinResources(), 
twoInputOperator.getPreferredResources());
                }
                else if (dataSet instanceof BulkIterationResultSet) {
                        BulkIterationResultSet<?> bulkIterationResultSet = 
(BulkIterationResultSet<?>) dataSet;
                        dataFlowOp = 
translateBulkIteration(bulkIterationResultSet);
-                       
dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResources(),
+                       
dataFlowOp.setResources(bulkIterationResultSet.getIterationHead().getMinResources(),
                                        
bulkIterationResultSet.getIterationHead().getPreferredResources());
                }
                else if (dataSet instanceof DeltaIterationResultSet) {
                        DeltaIterationResultSet<?, ?> deltaIterationResultSet = 
(DeltaIterationResultSet<?, ?>) dataSet;
                        dataFlowOp = 
translateDeltaIteration(deltaIterationResultSet);
-                       
dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResources(),
+                       
dataFlowOp.setResources(deltaIterationResultSet.getIterationHead().getMinResources(),
                                        
deltaIterationResultSet.getIterationHead().getPreferredResources());
                }
                else if (dataSet instanceof 
DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof 
DeltaIteration.WorksetPlaceHolder) {

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
index 992acc9..3de254b 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.junit.Test;
 
+import java.lang.reflect.Method;
+
 import static org.junit.Assert.assertEquals;
 
 public class OperatorTest {
@@ -46,19 +48,21 @@ public class OperatorTest {
                assertEquals(parallelism, operator.getParallelism());
        }
 
-       /*
        @Test
-       public void testConfigurationOfResource() {
+       public void testConfigurationOfResource() throws Exception{
                Operator operator = new MockOperator();
 
-               // verify explicit change in resource
-               ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0);
-               ResourceSpec preferredResource = new ResourceSpec(2.0, 200, 0, 
0, 0);
-               operator.setResource(minResource, preferredResource);
+               Method opMethod = 
Operator.class.getDeclaredMethod("setResources", ResourceSpec.class, 
ResourceSpec.class);
+               opMethod.setAccessible(true);
+
+               // verify explicit change in resources
+               ResourceSpec minResources = new ResourceSpec(1.0, 100);
+               ResourceSpec preferredResources = new ResourceSpec(2.0, 200);
+               opMethod.invoke(operator, minResources, preferredResources);
 
-               assertEquals(minResource, operator.getMinResource());
-               assertEquals(preferredResource, 
operator.getPreferredResource());
-       }*/
+               assertEquals(minResources, operator.getMinResources());
+               assertEquals(preferredResources, 
operator.getPreferredResources());
+       }
 
        private class MockOperator extends Operator {
                public MockOperator() {

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index bbc944e..a407bfe 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -538,6 +538,10 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                                        } else {
                                                container.setName("CHAIN " + 
containerTaskName + " -> " + chainedTask.getTaskName());
                                        }
+
+                                       //update resource of container task
+                                       
container.setResources(container.getMinResources().merge(node.getMinResources()),
+                                                       
container.getPreferredResources().merge(node.getPreferredResources()));
                                        
                                        
this.chainedTasksInSequence.add(chainedTask);
                                        return;
@@ -837,6 +841,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                } else {
                        // create task vertex
                        vertex = new JobVertex(taskName);
+                       vertex.setResources(node.getMinResources(), 
node.getPreferredResources());
                        vertex.setInvokableClass((this.currentIteration != null 
&& node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);
                        
                        config = new TaskConfig(vertex.getConfiguration());
@@ -862,6 +867,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                final DriverStrategy ds = node.getDriverStrategy();
                final JobVertex vertex = new JobVertex(taskName);
                final TaskConfig config = new 
TaskConfig(vertex.getConfiguration());
+               vertex.setResources(node.getMinResources(), 
node.getPreferredResources());
                vertex.setInvokableClass( (this.currentIteration != null && 
node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);
                
                // set user code
@@ -890,6 +896,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                final InputFormatVertex vertex = new 
InputFormatVertex(node.getNodeName());
                final TaskConfig config = new 
TaskConfig(vertex.getConfiguration());
 
+               vertex.setResources(node.getMinResources(), 
node.getPreferredResources());
                vertex.setInvokableClass(DataSourceTask.class);
                
vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
 
@@ -905,6 +912,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                final OutputFormatVertex vertex = new 
OutputFormatVertex(node.getNodeName());
                final TaskConfig config = new 
TaskConfig(vertex.getConfiguration());
 
+               vertex.setResources(node.getMinResources(), 
node.getPreferredResources());
                vertex.setInvokableClass(DataSinkTask.class);
                
vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
                
@@ -967,6 +975,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                        // everything else happens in the post visit, after the 
input (the initial partial solution)
                        // is connected.
                        headVertex = new JobVertex("PartialSolution 
("+iteration.getNodeName()+")");
+                       headVertex.setResources(iteration.getMinResources(), 
iteration.getPreferredResources());
                        headVertex.setInvokableClass(IterationHeadTask.class);
                        headConfig = new 
TaskConfig(headVertex.getConfiguration());
                        headConfig.setDriver(NoOpDriver.class);
@@ -1035,6 +1044,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                        // everything else happens in the post visit, after the 
input (the initial partial solution)
                        // is connected.
                        headVertex = new 
JobVertex("IterationHead("+iteration.getNodeName()+")");
+                       headVertex.setResources(iteration.getMinResources(), 
iteration.getPreferredResources());
                        headVertex.setInvokableClass(IterationHeadTask.class);
                        headConfig = new 
TaskConfig(headVertex.getConfiguration());
                        headConfig.setDriver(NoOpDriver.class);
@@ -1275,6 +1285,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                
                // --------------------------- create the sync task 
---------------------------
                final JobVertex sync = new JobVertex("Sync(" + 
bulkNode.getNodeName() + ")");
+               sync.setResources(bulkNode.getMinResources(), 
bulkNode.getPreferredResources());
                sync.setInvokableClass(IterationSynchronizationSinkTask.class);
                sync.setParallelism(1);
                sync.setMaxParallelism(1);
@@ -1412,6 +1423,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                final TaskConfig syncConfig;
                {
                        final JobVertex sync = new JobVertex("Sync (" + 
iterNode.getNodeName() + ")");
+                       sync.setResources(iterNode.getMinResources(), 
iterNode.getPreferredResources());
                        
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
                        sync.setParallelism(1);
                        sync.setMaxParallelism(1);
@@ -1468,7 +1480,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                                
                                if (hasWorksetTail) {
                                        
nextWorksetVertex.setInvokableClass(IterationTailTask.class);
-                                       
+
                                        
worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
new file mode 100644
index 0000000..d8ee80b
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.operators.Operator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertTrue;
+
+public class JobGraphGeneratorTest {
+
+       /**
+        * Verifies that the resources are merged correctly for chained 
operators when
+        * generating job graph
+        */
+       @Test
+       public void testResourcesForChainedOperators() throws Exception {
+               ResourceSpec resource1 = new ResourceSpec(0.1, 100);
+               ResourceSpec resource2 = new ResourceSpec(0.2, 200);
+               ResourceSpec resource3 = new ResourceSpec(0.3, 300);
+               ResourceSpec resource4 = new ResourceSpec(0.4, 400);
+               ResourceSpec resource5 = new ResourceSpec(0.5, 500);
+               ResourceSpec resource6 = new ResourceSpec(0.6, 600);
+               ResourceSpec resource7 = new ResourceSpec(0.7, 700);
+
+               Method opMethod = 
Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
+               opMethod.setAccessible(true);
+
+               Method sinkMethod = 
DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
+               sinkMethod.setAccessible(true);
+
+               MapFunction<Long, Long> mapFunction = new MapFunction<Long, 
Long>() {
+                       @Override
+                       public Long map(Long value) throws Exception {
+                               return value;
+                       }
+               };
+
+               FilterFunction<Long> filterFunction = new 
FilterFunction<Long>() {
+                       @Override
+                       public boolean filter(Long value) throws Exception {
+                               return false;
+                       }
+               };
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Long> input = env.fromElements(1L, 2L, 3L);
+               opMethod.invoke(input, resource1);
+
+               DataSet<Long> map1 = input.map(mapFunction);
+               opMethod.invoke(map1, resource2);
+
+               // CHAIN(Source -> Map -> Filter)
+               DataSet<Long> filter1 = map1.filter(filterFunction);
+               opMethod.invoke(filter1, resource3);
+
+               IterativeDataSet<Long> startOfIteration = filter1.iterate(10);
+               opMethod.invoke(startOfIteration, resource4);
+
+               DataSet<Long> map2 = startOfIteration.map(mapFunction);
+               opMethod.invoke(map2, resource5);
+
+               // CHAIN(Map -> Filter)
+               DataSet<Long> feedback = map2.filter(filterFunction);
+               opMethod.invoke(feedback, resource6);
+
+               DataSink<Long> sink = 
startOfIteration.closeWith(feedback).output(new DiscardingOutputFormat<Long>());
+               sinkMethod.invoke(sink, resource7);
+
+               Plan plan = env.createProgramPlan();
+               Optimizer pc = new Optimizer(new Configuration());
+               OptimizedPlan op = pc.compile(plan);
+
+               JobGraphGenerator jgg = new JobGraphGenerator();
+               JobGraph jobGraph = jgg.compileJobGraph(op);
+
+               JobVertex sourceMapFilterVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+               JobVertex iterationHeadVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+               JobVertex feedbackVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(2);
+               JobVertex sinkVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(3);
+               JobVertex iterationSyncVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(4);
+
+               
assertTrue(sourceMapFilterVertex.getMinResources().equals(resource1.merge(resource2).merge(resource3)));
+               
assertTrue(iterationHeadVertex.getPreferredResources().equals(resource4));
+               
assertTrue(feedbackVertex.getMinResources().equals(resource5.merge(resource6)));
+               
assertTrue(sinkVertex.getPreferredResources().equals(resource7));
+               
assertTrue(iterationSyncVertex.getMinResources().equals(resource4));
+       }
+
+       /**
+        * Verifies that the resources are set onto each job vertex correctly 
when generating job graph
+        * which covers the delta iteration case
+        */
+       @Test
+       public void testResourcesForDeltaIteration() throws Exception{
+               ResourceSpec resource1 = new ResourceSpec(0.1, 100);
+               ResourceSpec resource2 = new ResourceSpec(0.2, 200);
+               ResourceSpec resource3 = new ResourceSpec(0.3, 300);
+               ResourceSpec resource4 = new ResourceSpec(0.4, 400);
+               ResourceSpec resource5 = new ResourceSpec(0.5, 500);
+               ResourceSpec resource6 = new ResourceSpec(0.6, 600);
+
+               Method opMethod = 
Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
+               opMethod.setAccessible(true);
+
+               Method deltaMethod = 
DeltaIteration.class.getDeclaredMethod("setResources", ResourceSpec.class);
+               deltaMethod.setAccessible(true);
+
+               Method sinkMethod = 
DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
+               sinkMethod.setAccessible(true);
+
+               MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> mapFunction 
= new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+                       @Override
+                       public Tuple2<Long, Long> map(Tuple2<Long, Long> value) 
throws Exception {
+                               return value;
+                       }
+               };
+
+               FilterFunction<Tuple2<Long, Long>> filterFunction = new 
FilterFunction<Tuple2<Long, Long>>() {
+                       @Override
+                       public boolean filter(Tuple2<Long, Long> value) throws 
Exception {
+                               return false;
+                       }
+               };
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<Long, Long>> input = env.fromElements(new 
Tuple2<>(1L, 2L));
+               opMethod.invoke(input, resource1);
+
+               // CHAIN(Map -> Filter)
+               DataSet<Tuple2<Long, Long>> map = input.map(mapFunction);
+               opMethod.invoke(map, resource2);
+
+               DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration = map.iterateDelta(map, 100, 0).
+                               registerAggregator("test", new 
LongSumAggregator());
+               deltaMethod.invoke(iteration, resource3);
+
+               DataSet<Tuple2<Long, Long>> delta = 
iteration.getWorkset().map(mapFunction);
+               opMethod.invoke(delta, resource4);
+
+               DataSet<Tuple2<Long, Long>> feedback = 
delta.filter(filterFunction);
+               opMethod.invoke(feedback, resource5);
+
+               DataSink<Tuple2<Long, Long>> sink = iteration.closeWith(delta, 
feedback).
+                               output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
+               sinkMethod.invoke(sink, resource6);
+
+               Plan plan = env.createProgramPlan();
+               Optimizer pc = new Optimizer(new Configuration());
+               OptimizedPlan op = pc.compile(plan);
+
+               JobGraphGenerator jgg = new JobGraphGenerator();
+               JobGraph jobGraph = jgg.compileJobGraph(op);
+
+               JobVertex sourceMapVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+               JobVertex iterationHeadVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+               JobVertex deltaVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(2);
+               JobVertex iterationTailVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(3);
+               JobVertex feedbackVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(4);
+               JobVertex sinkVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(5);
+               JobVertex iterationSyncVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(6);
+
+               
assertTrue(sourceMapVertex.getMinResources().equals(resource1.merge(resource2)));
+               
assertTrue(iterationHeadVertex.getPreferredResources().equals(resource3));
+               assertTrue(deltaVertex.getMinResources().equals(resource4));
+               // the iteration tail task will be scheduled in the same 
instance with iteration head, and currently not set resources.
+               
assertTrue(iterationTailVertex.getPreferredResources().equals(ResourceSpec.DEFAULT));
+               assertTrue(feedbackVertex.getMinResources().equals(resource5));
+               
assertTrue(sinkVertex.getPreferredResources().equals(resource6));
+               
assertTrue(iterationSyncVertex.getMinResources().equals(resource3));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 260bd74..1180db4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -31,6 +32,8 @@ import org.apache.flink.util.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The base class for job vertexes.
  */
@@ -61,6 +64,12 @@ public class JobVertex implements java.io.Serializable {
        /** Maximum number of subtasks to split this taks into a runtime. */
        private int maxParallelism = -1;
 
+       /** The minimum resource of the vertex */
+       private ResourceSpec minResources = ResourceSpec.DEFAULT;
+
+       /** The preferred resource of the vertex */
+       private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
+
        /** Custom configuration passed to the assigned task at runtime. */
        private Configuration configuration;
 
@@ -278,6 +287,35 @@ public class JobVertex implements java.io.Serializable {
                this.maxParallelism = maxParallelism;
        }
 
+       /**
+        * Gets the minimum resource for the task.
+        *
+        * @return The minimum resource for the task.
+        */
+       public ResourceSpec getMinResources() {
+               return minResources;
+       }
+
+       /**
+        * Gets the preferred resource for the task.
+        *
+        * @return The preferred resource for the task.
+        */
+       public ResourceSpec getPreferredResources() {
+               return preferredResources;
+       }
+
+       /**
+        * Sets the minimum and preferred resources for the task.
+        *
+        * @param minResources The minimum resource for the task.
+        * @param preferredResources The preferred resource for the task.
+        */
+       public void setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+               this.minResources = checkNotNull(minResources);
+               this.preferredResources = checkNotNull(preferredResources);
+       }
+
        public InputSplitSource<?> getInputSplitSource() {
                return inputSplitSource;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 39d81c6..b0e78d7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -20,9 +20,11 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A Stream Sink. This is used for emitting elements from a streaming topology.
@@ -113,45 +115,44 @@ public class DataStreamSink<T> {
                return this;
        }
 
-//     
---------------------------------------------------------------------------
-//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
-//      The setters are hence commented out at this point.
-//     
---------------------------------------------------------------------------
-//     /**
-//      * Sets the minimum and preferred resources for this sink, and the 
lower and upper resource limits will
-//      * be considered in resource resize feature for future plan.
-//      *
-//      * @param minResources The minimum resources for this sink.
-//      * @param preferredResources The preferred resources for this sink
-//      * @return The sink with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public DataStreamSink<T> setResources(ResourceSpec minResources, 
ResourceSpec preferredResources) {
-//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
-//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
-//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
-//                             "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-//
-//             transformation.setResources(minResources, preferredResources);
-//
-//             return this;
-//     }
-//
-//     /**
-//      * Sets the resource for this sink, the minimum and preferred resources 
are the same by default.
-//      *
-//      * @param resources The resource for this sink.
-//      * @return The sink with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public DataStreamSink<T> setResources(ResourceSpec resources) {
-//             Preconditions.checkNotNull(resources, "The resource must be not 
null.");
-//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
-//
-//             transformation.setResources(resources, resources);
-//
-//             return this;
-//     }
+       //      
---------------------------------------------------------------------------
+       //       Fine-grained resource profiles are an incomplete 
work-in-progress feature
+       //       The setters are hence private at this point.
+       //      
---------------------------------------------------------------------------
+
+       /**
+        * Sets the minimum and preferred resources for this sink, and the 
lower and upper resource limits will
+        * be considered in resource resize feature for future plan.
+        *
+        * @param minResources The minimum resources for this sink.
+        * @param preferredResources The preferred resources for this sink
+        * @return The sink with set minimum and preferred resources.
+        */
+       private DataStreamSink<T> setResources(ResourceSpec minResources, 
ResourceSpec preferredResources) {
+               Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+               Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+               Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+                               "The values in resources must be not less than 
0 and the preferred resources must be greater than the min resources.");
+
+               transformation.setResources(minResources, preferredResources);
+
+               return this;
+       }
+
+       /**
+        * Sets the resources for this sink, the minimum and preferred 
resources are the same by default.
+        *
+        * @param resources The resources for this sink.
+        * @return The sink with set minimum and preferred resources.
+        */
+       private DataStreamSink<T> setResources(ResourceSpec resources) {
+               Preconditions.checkNotNull(resources, "The resources must be 
not null.");
+               Preconditions.checkArgument(resources.isValid(), "The values in 
resources must be not less than 0.");
+
+               transformation.setResources(resources, resources);
+
+               return this;
+       }
 
        /**
         * Turns off chaining for this operator so thread co-location will not 
be

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 859c6d5..f1d5e3a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -154,45 +155,44 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
                return this;
        }
 
-//     
---------------------------------------------------------------------------
-//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
-//      The setters are hence commented out at this point.
-//     
---------------------------------------------------------------------------
-//     /**
-//      * Sets the minimum and preferred resources for this operator, and the 
lower and upper resource limits will
-//      * be considered in dynamic resource resize feature for future plan.
-//      *
-//      * @param minResources The minimum resources for this operator.
-//      * @param preferredResources The preferred resources for this operator.
-//      * @return The operator with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public SingleOutputStreamOperator<T> setResources(ResourceSpec 
minResources, ResourceSpec preferredResources) {
-//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
-//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
-//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && minResource.lessThanOrEqual(preferredResources),
-//                             "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-//
-//             transformation.setResources(minResources, preferredResources);
-//
-//             return this;
-//     }
-//
-//     /**
-//      * Sets the resources for this operator, the minimum and preferred 
resources are the same by default.
-//      *
-//      * @param resources The resources for this operator.
-//      * @return The operator with set minimum and preferred resources.
-//      */
-//     @PublicEvolving
-//     public SingleOutputStreamOperator<T> setResources(ResourceSpec 
resources) {
-//             Preconditions.checkNotNull(resources, "The resource must be not 
null.");
-//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
-//
-//             transformation.setResources(resources, resources);
-//
-//             return this;
-//     }
+       //      
---------------------------------------------------------------------------
+       //       Fine-grained resource profiles are an incomplete 
work-in-progress feature
+       //       The setters are hence private at this point.
+       //      
---------------------------------------------------------------------------
+
+       /**
+        * Sets the minimum and preferred resources for this operator, and the 
lower and upper resource limits will
+        * be considered in dynamic resource resize feature for future plan.
+        *
+        * @param minResources The minimum resources for this operator.
+        * @param preferredResources The preferred resources for this operator.
+        * @return The operator with set minimum and preferred resources.
+        */
+       private SingleOutputStreamOperator<T> setResources(ResourceSpec 
minResources, ResourceSpec preferredResources) {
+               Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+               Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+               Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+                               "The values in resources must be not less than 
0 and the preferred resources must be greater than the min resources.");
+
+               transformation.setResources(minResources, preferredResources);
+
+               return this;
+       }
+
+       /**
+        * Sets the resources for this operator, the minimum and preferred 
resources are the same by default.
+        *
+        * @param resources The resources for this operator.
+        * @return The operator with set minimum and preferred resources.
+        */
+       private SingleOutputStreamOperator<T> setResources(ResourceSpec 
resources) {
+               Preconditions.checkNotNull(resources, "The resources must be 
not null.");
+               Preconditions.checkArgument(resources.isValid(), "The values in 
resources must be not less than 0.");
+
+               transformation.setResources(resources, resources);
+
+               return this;
+       }
 
        private boolean canBeParallel() {
                return !nonParallel;

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a87e63d..c1775e4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -414,9 +414,9 @@ public class StreamGraph extends StreamingPlan {
                }
        }
 
-       public void setResource(int vertexID, ResourceSpec minResource, 
ResourceSpec preferredResource) {
+       public void setResources(int vertexID, ResourceSpec minResources, 
ResourceSpec preferredResources) {
                if (getStreamNode(vertexID) != null) {
-                       getStreamNode(vertexID).setResources(minResource, 
preferredResource);
+                       getStreamNode(vertexID).setResources(minResources, 
preferredResources);
                }
        }
 
@@ -477,6 +477,7 @@ public class StreamGraph extends StreamingPlan {
                StreamNode node = streamNodes.get(nodeId);
                if (node != null) {
                        node.setUserHash(nodeHash);
+
                }
        }
 
@@ -540,7 +541,9 @@ public class StreamGraph extends StreamingPlan {
                int sinkId,
                long timeout,
                int parallelism,
-               int maxParallelism) {
+               int maxParallelism,
+               ResourceSpec minResources,
+               ResourceSpec preferredResources) {
                StreamNode source = this.addNode(sourceId,
                        null,
                        StreamIterationHead.class,
@@ -549,6 +552,7 @@ public class StreamGraph extends StreamingPlan {
                sources.add(source.getId());
                setParallelism(source.getId(), parallelism);
                setMaxParallelism(source.getId(), maxParallelism);
+               setResources(source.getId(), minResources, preferredResources);
 
                StreamNode sink = this.addNode(sinkId,
                        null,

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index de87a66..f4d4071 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -203,7 +203,7 @@ public class StreamGraphGenerator {
                }
 
                if (transform.getMinResources() != null && 
transform.getPreferredResources() != null) {
-                       streamGraph.setResource(transform.getId(), 
transform.getMinResources(), transform.getPreferredResources());
+                       streamGraph.setResources(transform.getId(), 
transform.getMinResources(), transform.getPreferredResources());
                }
 
                return transformedIds;
@@ -335,7 +335,9 @@ public class StreamGraphGenerator {
                        getNewIterationNodeId(),
                        iterate.getWaitTime(),
                        iterate.getParallelism(),
-                       iterate.getMaxParallelism());
+                       iterate.getMaxParallelism(),
+                       iterate.getMinResources(),
+                       iterate.getPreferredResources() );
 
                StreamNode itSource = itSourceAndSink.f0;
                StreamNode itSink = itSourceAndSink.f1;
@@ -400,7 +402,9 @@ public class StreamGraphGenerator {
                                getNewIterationNodeId(),
                                coIterate.getWaitTime(),
                                coIterate.getParallelism(),
-                               coIterate.getMaxParallelism());
+                               coIterate.getMaxParallelism(),
+                               coIterate.getMinResources(),
+                               coIterate.getPreferredResources());
 
                StreamNode itSource = itSourceAndSink.f0;
                StreamNode itSink = itSourceAndSink.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 2d2e1e75..e4164ba 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -49,8 +49,8 @@ public class StreamNode implements Serializable {
         * dynamic scaling and the number of key groups used for partitioned 
state.
         */
        private int maxParallelism;
-       private ResourceSpec minResources;
-       private ResourceSpec preferredResources;
+       private ResourceSpec minResources = ResourceSpec.DEFAULT;
+       private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
        private Long bufferTimeout = null;
        private final String operatorName;
        private String slotSharingGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c18b527..5c1e1ac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -85,6 +86,9 @@ public class StreamingJobGraphGenerator {
        private Map<Integer, StreamConfig> vertexConfigs;
        private Map<Integer, String> chainedNames;
 
+       private Map<Integer, ResourceSpec> chainedMinResources;
+       private Map<Integer, ResourceSpec> chainedPreferredResources;
+
        private final StreamGraphHasher defaultStreamGraphHasher;
        private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
@@ -100,6 +104,8 @@ public class StreamingJobGraphGenerator {
                this.chainedConfigs = new HashMap<>();
                this.vertexConfigs = new HashMap<>();
                this.chainedNames = new HashMap<>();
+               this.chainedMinResources = new HashMap<>();
+               this.chainedPreferredResources = new HashMap<>();
                this.physicalEdgesInOrder = new ArrayList<>();
        }
 
@@ -211,6 +217,8 @@ public class StreamingJobGraphGenerator {
                        }
 
                        chainedNames.put(currentNodeId, 
createChainedName(currentNodeId, chainableOutputs));
+                       chainedMinResources.put(currentNodeId, 
createChainedMinResources(currentNodeId, chainableOutputs));
+                       chainedPreferredResources.put(currentNodeId, 
createChainedPreferredResources(currentNodeId, chainableOutputs));
 
                        StreamConfig config = currentNodeId.equals(startNodeId)
                                        ? createJobVertex(startNodeId, hashes, 
legacyHashes)
@@ -269,6 +277,22 @@ public class StreamingJobGraphGenerator {
                }
        }
 
+       private ResourceSpec createChainedMinResources(Integer vertexID, 
List<StreamEdge> chainedOutputs) {
+               ResourceSpec minResources = 
streamGraph.getStreamNode(vertexID).getMinResources();
+               for (StreamEdge chainable : chainedOutputs) {
+                       minResources = 
minResources.merge(chainedMinResources.get(chainable.getTargetId()));
+               }
+               return minResources;
+       }
+
+       private ResourceSpec createChainedPreferredResources(Integer vertexID, 
List<StreamEdge> chainedOutputs) {
+               ResourceSpec preferredResources = 
streamGraph.getStreamNode(vertexID).getPreferredResources();
+               for (StreamEdge chainable : chainedOutputs) {
+                       preferredResources = 
preferredResources.merge(chainedPreferredResources.get(chainable.getTargetId()));
+               }
+               return preferredResources;
+       }
+
        private StreamConfig createJobVertex(
                        Integer streamNodeId,
                        Map<Integer, byte[]> hashes,
@@ -308,6 +332,8 @@ public class StreamingJobGraphGenerator {
                                        legacyJobVertexIds);
                }
 
+               jobVertex.setResources(chainedMinResources.get(streamNodeId), 
chainedPreferredResources.get(streamNodeId));
+
                jobVertex.setInvokableClass(streamNode.getJobVertexClass());
 
                int parallelism = streamNode.getParallelism();

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 24b5736..e86b3e8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -132,13 +132,13 @@ public abstract class StreamTransformation<T> {
         *  The minimum resources for this stream transformation. It defines 
the lower limit for
         *  dynamic resources resize in future plan.
         */
-       private ResourceSpec minResources = ResourceSpec.UNKNOWN;
+       private ResourceSpec minResources = ResourceSpec.DEFAULT;
 
        /**
         *  The preferred resources for this stream transformation. It defines 
the upper limit for
         *  dynamic resource resize in future plan.
         */
-       private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
+       private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
 
        /**
         * User-specified ID for this transformation. This is used to assign the

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index b4d2421..5660655 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api;
 
+import java.lang.reflect.Method;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
@@ -26,6 +27,7 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -516,11 +518,10 @@ public class DataStreamTest {
        }
 
        /**
-        * Tests whether resource gets set.
+        * Tests whether resources get set.
         */
-       /*
        @Test
-       public void testResource() {
+       public void testResources() throws Exception{
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                ResourceSpec minResource1 = new ResourceSpec(1.0, 100);
@@ -542,23 +543,35 @@ public class DataStreamTest {
                ResourceSpec preferredResource6 = new ResourceSpec(2.0, 700);
 
                ResourceSpec minResource7 = new ResourceSpec(1.0, 700);
-               ResourceSpec maxResource7 = new ResourceSpec(2.0, 800);
+               ResourceSpec preferredResource7 = new ResourceSpec(2.0, 800);
+
+               Method opMethod = 
SingleOutputStreamOperator.class.getDeclaredMethod("setResources", 
ResourceSpec.class, ResourceSpec.class);
+               opMethod.setAccessible(true);
+
+               Method sinkMethod = 
DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class, 
ResourceSpec.class);
+               sinkMethod.setAccessible(true);
+
+               DataStream<Long> source1 = env.generateSequence(0, 0);
+               opMethod.invoke(source1, minResource1, preferredResource1);
 
-               DataStream<Long> source1 = env.generateSequence(0, 
0).setResource(minResource1, preferredResource1);
                DataStream<Long> map1 = source1.map(new MapFunction<Long, 
Long>() {
                        @Override
                        public Long map(Long value) throws Exception {
                                return null;
                        }
-               }).setResource(minResource2, preferredResource2);
+               });
+               opMethod.invoke(map1, minResource2, preferredResource2);
+
+               DataStream<Long> source2 = env.generateSequence(0, 0);
+               opMethod.invoke(source2, minResource3, preferredResource3);
 
-               DataStream<Long> source2 = env.generateSequence(0, 
0).setResource(minResource3, preferredResource3);
                DataStream<Long> map2 = source2.map(new MapFunction<Long, 
Long>() {
                        @Override
                        public Long map(Long value) throws Exception {
                                return null;
                        }
-               }).setResource(minResource4, preferredResource4);
+               });
+               opMethod.invoke(map2, minResource4, preferredResource4);
 
                DataStream<Long> connected = map1.connect(map2)
                                .flatMap(new CoFlatMapFunction<Long, Long, 
Long>() {
@@ -568,7 +581,8 @@ public class DataStreamTest {
                                        @Override
                                        public void flatMap2(Long value, 
Collector<Long> out) throws Exception {
                                        }
-                               }).setResource(minResource5, 
preferredResource5);
+                               });
+               opMethod.invoke(connected, minResource5, preferredResource5);
 
                DataStream<Long> windowed = connected
                                .windowAll(GlobalWindows.create())
@@ -580,31 +594,33 @@ public class DataStreamTest {
                                        public Long fold(Long accumulator, Long 
value) throws Exception {
                                                return null;
                                        }
-                               }).setResource(minResource6, 
preferredResource6);
+                               });
+               opMethod.invoke(windowed, minResource6, preferredResource6);
 
-               DataStreamSink<Long> sink = 
windowed.print().setResource(minResource7, maxResource7);
+               DataStreamSink<Long> sink = windowed.print();
+               sinkMethod.invoke(sink, minResource7, preferredResource7);
 
-               assertEquals(minResource1, 
env.getStreamGraph().getStreamNode(source1.getId()).getMinResource());
-               assertEquals(preferredResource1, 
env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResource());
+               assertEquals(minResource1, 
env.getStreamGraph().getStreamNode(source1.getId()).getMinResources());
+               assertEquals(preferredResource1, 
env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResources());
 
-               assertEquals(minResource2, 
env.getStreamGraph().getStreamNode(map1.getId()).getMinResource());
-               assertEquals(preferredResource2, 
env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResource());
+               assertEquals(minResource2, 
env.getStreamGraph().getStreamNode(map1.getId()).getMinResources());
+               assertEquals(preferredResource2, 
env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResources());
 
-               assertEquals(minResource3, 
env.getStreamGraph().getStreamNode(source2.getId()).getMinResource());
-               assertEquals(preferredResource3, 
env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResource());
+               assertEquals(minResource3, 
env.getStreamGraph().getStreamNode(source2.getId()).getMinResources());
+               assertEquals(preferredResource3, 
env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResources());
 
-               assertEquals(minResource4, 
env.getStreamGraph().getStreamNode(map2.getId()).getMinResource());
-               assertEquals(preferredResource4, 
env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResource());
+               assertEquals(minResource4, 
env.getStreamGraph().getStreamNode(map2.getId()).getMinResources());
+               assertEquals(preferredResource4, 
env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResources());
 
-               assertEquals(minResource5, 
env.getStreamGraph().getStreamNode(connected.getId()).getMinResource());
-               assertEquals(preferredResource5, 
env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResource());
+               assertEquals(minResource5, 
env.getStreamGraph().getStreamNode(connected.getId()).getMinResources());
+               assertEquals(preferredResource5, 
env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResources());
 
-               assertEquals(minResource6, 
env.getStreamGraph().getStreamNode(windowed.getId()).getMinResource());
-               assertEquals(preferredResource6, 
env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResource());
+               assertEquals(minResource6, 
env.getStreamGraph().getStreamNode(windowed.getId()).getMinResources());
+               assertEquals(preferredResource6, 
env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResources());
 
-               assertEquals(minResource7, 
env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResource());
-               assertEquals(maxResource7, 
env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResource());
-       }*/
+               assertEquals(minResource7, 
env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResources());
+               assertEquals(preferredResource7, 
env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResources());
+       }
 
        @Test
        public void testTypeInfo() {

http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 5f1973c..6d2fcaa 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,19 +17,29 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Map;
 
@@ -139,7 +149,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                StreamConfig sourceConfig = new 
StreamConfig(sourceVertex.getConfiguration());
                StreamConfig mapConfig = new 
StreamConfig(mapPrintVertex.getConfiguration());
                Map<Integer, StreamConfig> chainedConfigs = 
mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
-               StreamConfig printConfig = chainedConfigs.get(3);
+               StreamConfig printConfig = 
chainedConfigs.values().iterator().next();
 
                assertTrue(sourceConfig.isChainStart());
                assertTrue(sourceConfig.isChainEnd());
@@ -150,4 +160,151 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertFalse(printConfig.isChainStart());
                assertTrue(printConfig.isChainEnd());
        }
+
+       /**
+        * Verifies that the resources are merged correctly for chained 
operators (covers source and sink cases)
+        * when generating job graph
+        */
+       @Test
+       public void testResourcesForChainedSourceSink() throws Exception {
+               ResourceSpec resource1 = new ResourceSpec(0.1, 100);
+               ResourceSpec resource2 = new ResourceSpec(0.2, 200);
+               ResourceSpec resource3 = new ResourceSpec(0.3, 300);
+               ResourceSpec resource4 = new ResourceSpec(0.4, 400);
+               ResourceSpec resource5 = new ResourceSpec(0.5, 500);
+
+               Method opMethod = 
SingleOutputStreamOperator.class.getDeclaredMethod("setResources", 
ResourceSpec.class);
+               opMethod.setAccessible(true);
+
+               Method sinkMethod = 
DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
+               sinkMethod.setAccessible(true);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<Integer, Integer>> source = env.addSource(new 
ParallelSourceFunction<Tuple2<Integer, Integer>>() {
+                       @Override
+                       public void run(SourceContext<Tuple2<Integer, Integer>> 
ctx) throws Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               });
+               opMethod.invoke(source, resource1);
+
+               DataStream<Tuple2<Integer, Integer>> map = source.map(new 
MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+                       @Override
+                       public Tuple2<Integer, Integer> map(Tuple2<Integer, 
Integer> value) throws Exception {
+                               return value;
+                       }
+               });
+               opMethod.invoke(map, resource2);
+
+               // CHAIN(Source -> Map -> Filter)
+               DataStream<Tuple2<Integer, Integer>> filter = map.filter(new 
FilterFunction<Tuple2<Integer, Integer>>() {
+                       @Override
+                       public boolean filter(Tuple2<Integer, Integer> value) 
throws Exception {
+                               return false;
+                       }
+               });
+               opMethod.invoke(filter, resource3);
+
+               DataStream<Tuple2<Integer, Integer>> reduce = 
filter.keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
+                       @Override
+                       public Tuple2<Integer, Integer> reduce(Tuple2<Integer, 
Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
+                               return new Tuple2<>(value1.f0, value1.f1 + 
value2.f1);
+                       }
+               });
+               opMethod.invoke(reduce, resource4);
+
+               DataStreamSink<Tuple2<Integer, Integer>> sink = 
reduce.addSink(new SinkFunction<Tuple2<Integer, Integer>>() {
+                       @Override
+                       public void invoke(Tuple2<Integer, Integer> value) 
throws Exception {
+                       }
+               });
+               sinkMethod.invoke(sink, resource5);
+
+               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+
+               JobVertex sourceMapFilterVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+               JobVertex reduceSinkVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+
+               
assertTrue(sourceMapFilterVertex.getMinResources().equals(resource1.merge(resource2).merge(resource3)));
+               
assertTrue(reduceSinkVertex.getPreferredResources().equals(resource4.merge(resource5)));
+       }
+
+       /**
+        * Verifies that the resources are merged correctly for chained 
operators (covers middle chaining and iteration cases)
+        * when generating job graph
+        */
+       @Test
+       public void testResourcesForIteration() throws Exception {
+               ResourceSpec resource1 = new ResourceSpec(0.1, 100);
+               ResourceSpec resource2 = new ResourceSpec(0.2, 200);
+               ResourceSpec resource3 = new ResourceSpec(0.3, 300);
+               ResourceSpec resource4 = new ResourceSpec(0.4, 400);
+               ResourceSpec resource5 = new ResourceSpec(0.5, 500);
+
+               Method opMethod = 
SingleOutputStreamOperator.class.getDeclaredMethod("setResources", 
ResourceSpec.class);
+               opMethod.setAccessible(true);
+
+               Method sinkMethod = 
DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
+               sinkMethod.setAccessible(true);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Integer> source = env.addSource(new 
ParallelSourceFunction<Integer>() {
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).name("test_source");
+               opMethod.invoke(source, resource1);
+
+               IterativeStream<Integer> iteration = source.iterate(3000);
+               opMethod.invoke(iteration, resource2);
+
+               DataStream<Integer> flatMap = iteration.flatMap(new 
FlatMapFunction<Integer, Integer>() {
+                       @Override
+                       public void flatMap(Integer value, Collector<Integer> 
out) throws Exception {
+                               out.collect(value);
+                       }
+               }).name("test_flatMap");
+               opMethod.invoke(flatMap, resource3);
+
+               // CHAIN(flatMap -> Filter)
+               DataStream<Integer> increment = flatMap.filter(new 
FilterFunction<Integer>() {
+                       @Override
+                       public boolean filter(Integer value) throws Exception {
+                               return false;
+                       }
+               }).name("test_filter");
+               opMethod.invoke(increment, resource4);
+
+               DataStreamSink<Integer> sink = 
iteration.closeWith(increment).addSink(new SinkFunction<Integer>() {
+                       @Override
+                       public void invoke(Integer value) throws Exception {
+                       }
+               }).disableChaining().name("test_sink");
+               sinkMethod.invoke(sink, resource5);
+
+               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+
+               for (JobVertex jobVertex : jobGraph.getVertices()) {
+                       if (jobVertex.getName().contains("test_source")) {
+                               
assertTrue(jobVertex.getMinResources().equals(resource1));
+                       } else if 
(jobVertex.getName().contains("Iteration_Source")) {
+                               
assertTrue(jobVertex.getPreferredResources().equals(resource2));
+                       } else if 
(jobVertex.getName().contains("test_flatMap")) {
+                               
assertTrue(jobVertex.getMinResources().equals(resource3.merge(resource4)));
+                       } else if 
(jobVertex.getName().contains("Iteration_Tail")) {
+                               
assertTrue(jobVertex.getPreferredResources().equals(ResourceSpec.DEFAULT));
+                       } else if (jobVertex.getName().contains("test_sink")) {
+                               
assertTrue(jobVertex.getMinResources().equals(resource5));
+                       }
+               }
+       }
 }

Reply via email to