[FLINK-5134] [runtime] Aggregate ResourceSpec for chained operators when generating job graph
This closes #3455 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/980d072f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/980d072f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/980d072f Branch: refs/heads/master Commit: 980d072fa2546dbc10cf878cf29532b2d8bbca8a Parents: 24408e1 Author: æ·æ± <taojiang....@alibaba-inc.com> Authored: Thu Mar 2 13:51:39 2017 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Mar 16 14:43:27 2017 +0100 ---------------------------------------------------------------------- .../flink/api/common/operators/Operator.java | 12 +- .../api/common/operators/ResourceSpec.java | 2 +- .../flink/api/java/operators/DataSink.java | 90 ++++---- .../api/java/operators/DeltaIteration.java | 88 ++++---- .../flink/api/java/operators/Operator.java | 96 +++++---- .../api/java/operators/OperatorTranslation.java | 12 +- .../flink/api/java/operator/OperatorTest.java | 22 +- .../plantranslate/JobGraphGenerator.java | 14 +- .../plantranslate/JobGraphGeneratorTest.java | 209 +++++++++++++++++++ .../flink/runtime/jobgraph/JobVertex.java | 38 ++++ .../api/datastream/DataStreamSink.java | 79 +++---- .../datastream/SingleOutputStreamOperator.java | 78 +++---- .../flink/streaming/api/graph/StreamGraph.java | 10 +- .../api/graph/StreamGraphGenerator.java | 10 +- .../flink/streaming/api/graph/StreamNode.java | 4 +- .../api/graph/StreamingJobGraphGenerator.java | 26 +++ .../transformations/StreamTransformation.java | 4 +- .../flink/streaming/api/DataStreamTest.java | 68 +++--- .../graph/StreamingJobGraphGeneratorTest.java | 159 +++++++++++++- 19 files changed, 739 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index 1905555..6d906f2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -28,8 +28,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Visitable; -import javax.annotation.Nullable; - /** * Abstract base class for all operators. An operator is a source, sink, or it applies an operation to * one or more inputs, producing a result. @@ -47,11 +45,9 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use - @Nullable - private ResourceSpec minResources; // the minimum resource of the contract instance. + private ResourceSpec minResources = ResourceSpec.DEFAULT; // the minimum resource of the contract instance. - @Nullable - private ResourceSpec preferredResources; // the preferred resource of the contract instance. + private ResourceSpec preferredResources = ResourceSpec.DEFAULT; // the preferred resource of the contract instance. /** * The return type of the user function. @@ -199,7 +195,6 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> { * * @return The minimum resources of this operator. */ - @Nullable @PublicEvolving public ResourceSpec getMinResources() { return this.minResources; @@ -211,7 +206,6 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> { * * @return The preferred resource of this operator. */ - @Nullable @PublicEvolving public ResourceSpec getPreferredResources() { return this.preferredResources; @@ -225,7 +219,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> { * @param preferredResources The preferred resource of this operator. */ @PublicEvolving - public void setResource(ResourceSpec minResources, ResourceSpec preferredResources) { + public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) { this.minResources = minResources; this.preferredResources = preferredResources; } http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java index 0ea289a..cd3e5ad 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java @@ -44,7 +44,7 @@ public class ResourceSpec implements Serializable { private static final long serialVersionUID = 1L; - public static final ResourceSpec UNKNOWN = new ResourceSpec(0, 0, 0, 0, 0); + public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0); /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */ private final double cpuCores; http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 369e013..fd8190c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -52,9 +52,9 @@ public class DataSink<T> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; - private ResourceSpec minResources = ResourceSpec.UNKNOWN; + private ResourceSpec minResources = ResourceSpec.DEFAULT; - private ResourceSpec preferredResources = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResources = ResourceSpec.DEFAULT; private Configuration parameters; @@ -306,49 +306,45 @@ public class DataSink<T> { return this.preferredResources; } -// --------------------------------------------------------------------------- -// Fine-grained resource profiles are an incomplete work-in-progress feature -// The setters are hence commented out at this point. -// --------------------------------------------------------------------------- -// -// /** -// * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource. -// * The minimum resource must be satisfied and the preferred resource specifies the upper bound -// * for dynamic resource resize. -// * -// * @param minResources The minimum resource for this data sink. -// * @param preferredResources The preferred resource for this data sink. -// * @return The data sink with set minimum and preferred resources. -// */ -// @PublicEvolving -// public DataSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { -// Preconditions.checkNotNull(minResources, "The min resources must be not null."); -// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); -// Preconditions.checkArgument(minResources.isValid() && -// preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), -// "The values in resource must be not less than 0 and the preferred " + -// "resource must be greater than the min resource."); -// -// this.minResources = minResources; -// this.preferredResources = preferredResources; -// -// return this; -// } -// -// /** -// * Sets the resources for this data sink. This overrides the default resource profile. -// * -// * @param resources The resources for this data sink. -// * @return The data sink with set minimum and preferred resources. -// */ -// @PublicEvolving -// public DataSink<T> setResources(ResourceSpec resources) { -// Preconditions.checkNotNull(resources, "The resources must be not null."); -// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); -// -// this.minResources = resources; -// this.preferredResources = resources; -// -// return this; -// } + // --------------------------------------------------------------------------- + // Fine-grained resource profiles are an incomplete work-in-progress feature + // The setters are hence private at this point. + // --------------------------------------------------------------------------- + + /** + * Sets the minimum and preferred resources for this data sink. and the lower and upper resource limits + * will be considered in resource resize feature for future plan. + * + * @param minResources The minimum resources for this data sink. + * @param preferredResources The preferred resources for this data sink. + * @return The data sink with set minimum and preferred resources. + */ + private DataSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + Preconditions.checkNotNull(minResources, "The min resources must be not null."); + Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); + Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), + "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources."); + + this.minResources = minResources; + this.preferredResources = preferredResources; + + return this; + } + + /** + * Sets the resources for this data sink, and the minimum and preferred resources are the same by default. + * + * @param resources The resources for this data sink. + * @return The data sink with set minimum and preferred resources. + */ + private DataSink<T> setResources(ResourceSpec resources) { + Preconditions.checkNotNull(resources, "The resources must be not null."); + Preconditions.checkArgument(resources.isValid(), "The values in resources must be not less than 0."); + + this.minResources = resources; + this.preferredResources = resources; + + return this; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index 3d327e9..61f83b1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -64,9 +64,9 @@ public class DeltaIteration<ST, WT> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; - private ResourceSpec minResources = ResourceSpec.UNKNOWN; + private ResourceSpec minResources = ResourceSpec.DEFAULT; - private ResourceSpec preferredResources = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResources = ResourceSpec.DEFAULT; private boolean solutionSetUnManaged; @@ -197,49 +197,47 @@ public class DeltaIteration<ST, WT> { return parallelism; } -// --------------------------------------------------------------------------- -// Fine-grained resource profiles are an incomplete work-in-progress feature -// The setters are hence commented out at this point. -// --------------------------------------------------------------------------- -// -// /** -// * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource. -// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. -// * -// * @param minResources The minimum resource for the iteration. -// * @param preferredResources The preferred resource for the iteration. -// * @return The iteration with set minimum and preferred resources. -// */ -// @PublicEvolving -// public DeltaIteration<ST, WT> setResource(ResourceSpec minResources, ResourceSpec preferredResources) { -// Preconditions.checkNotNull(minResources, "The min resources must be not null."); -// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); -// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), -// "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources."); -// -// this.minResources = minResources; -// this.preferredResources = preferredResources; -// -// return this; -// } -// -// /** -// * Sets the resource for the iteration, and the minimum and preferred resources are the same by default. -// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. -// * -// * @param resources The resource for the iteration. -// * @return The iteration with set minimum and preferred resources. -// */ -// @PublicEvolving -// public DeltaIteration<ST, WT> setResource(ResourceSpec resources) { -// Preconditions.checkNotNull(resources, "The resources must be not null."); -// Preconditions.checkArgument(resources.isValid(), "The values in resource must be not less than 0."); -// -// this.minResources = resources; -// this.preferredResources = resources; -// -// return this; -// } + // --------------------------------------------------------------------------- + // Fine-grained resource profiles are an incomplete work-in-progress feature + // The setters are hence private at this point. + // --------------------------------------------------------------------------- + + /** + * Sets the minimum and preferred resources for the iteration. This overrides the default resources. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param minResources The minimum resources for the iteration. + * @param preferredResources The preferred resources for the iteration. + * @return The iteration with set minimum and preferred resources. + */ + private DeltaIteration<ST, WT> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + Preconditions.checkNotNull(minResources, "The min resources must be not null."); + Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); + Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), + "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources."); + + this.minResources = minResources; + this.preferredResources = preferredResources; + + return this; + } + + /** + * Sets the resources for the iteration, and the minimum and preferred resources are the same by default. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param resources The resources for the iteration. + * @return The iteration with set minimum and preferred resources. + */ + private DeltaIteration<ST, WT> setResources(ResourceSpec resources) { + Preconditions.checkNotNull(resources, "The resources must be not null."); + Preconditions.checkArgument(resources.isValid(), "The values in resources must be not less than 0."); + + this.minResources = resources; + this.preferredResources = resources; + + return this; + } /** * Gets the minimum resources from this iteration. If no minimum resources have been set, http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java index 6ae59dd..e496c62 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java @@ -39,9 +39,9 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet< protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; - protected ResourceSpec minResources = ResourceSpec.UNKNOWN; + protected ResourceSpec minResources = ResourceSpec.DEFAULT; - protected ResourceSpec preferredResources = ResourceSpec.UNKNOWN; + protected ResourceSpec preferredResources = ResourceSpec.DEFAULT; protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) { @@ -130,51 +130,49 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet< return returnType; } -// --------------------------------------------------------------------------- -// Fine-grained resource profiles are an incomplete work-in-progress feature -// The setters are hence commented out at this point. -// --------------------------------------------------------------------------- -// -// /** -// * Sets the minimum and preferred resources for this operator. This overrides the default empty resource. -// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. -// * -// * @param minResources The minimum resource for this operator. -// * @param preferredResources The preferred resource for this operator. -// * @return The operator with set minimum and preferred resources. -// */ -// @PublicEvolving -// public O setResources(ResourceSpec minResources, ResourceSpec preferredResources) { -// Preconditions.checkNotNull(minResources, "The min resources must be not null."); -// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); -// -// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), -// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); -// -// this.minResources = minResources; -// this.preferredResources = preferredResources; -// -// @SuppressWarnings("unchecked") -// O returnType = (O) this; -// return returnType; -// } -// -// /** -// * Sets the resources for this operator. This overrides the default minimum and preferred resources. -// * -// * @param resources The resource for this operator. -// * @return The operator with set minimum and preferred resources. -// */ -// @PublicEvolving -// public O setResources(ResourceSpec resources) { -// Preconditions.checkNotNull(resources, "The resource must be not null."); -// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); -// -// this.minResources = resources; -// this.preferredResources = resources; -// -// @SuppressWarnings("unchecked") -// O returnType = (O) this; -// return returnType; -// } + // --------------------------------------------------------------------------- + // Fine-grained resource profiles are an incomplete work-in-progress feature + // The setters are hence private at this point. + // --------------------------------------------------------------------------- + + /** + * Sets the minimum and preferred resources for this operator. This overrides the default resources. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param minResources The minimum resources for this operator. + * @param preferredResources The preferred resources for this operator. + * @return The operator with set minimum and preferred resources. + */ + private O setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + Preconditions.checkNotNull(minResources, "The min resources must be not null."); + Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); + + Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), + "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources."); + + this.minResources = minResources; + this.preferredResources = preferredResources; + + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } + + /** + * Sets the resources for this operator. This overrides the default minimum and preferred resources. + * + * @param resources The resources for this operator. + * @return The operator with set minimum and preferred resources. + */ + private O setResources(ResourceSpec resources) { + Preconditions.checkNotNull(resources, "The resources must be not null."); + Preconditions.checkArgument(resources.isValid(), "The values in resources must be not less than 0."); + + this.minResources = resources; + this.preferredResources = resources; + + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 3bffd8b..cdbec71 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -64,7 +64,7 @@ public class OperatorTranslation { // translate the sink itself and connect it to the input GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input); - translatedSink.setResource(sink.getMinResources(), sink.getPreferredResources()); + translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources()); return translatedSink; } @@ -95,28 +95,28 @@ public class OperatorTranslation { if (dataSet instanceof DataSource) { DataSource<T> dataSource = (DataSource<T>) dataSet; dataFlowOp = dataSource.translateToDataFlow(); - dataFlowOp.setResource(dataSource.getMinResources(), dataSource.getPreferredResources()); + dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources()); } else if (dataSet instanceof SingleInputOperator) { SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet; dataFlowOp = translateSingleInputOperator(singleInputOperator); - dataFlowOp.setResource(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources()); + dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources()); } else if (dataSet instanceof TwoInputOperator) { TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet; dataFlowOp = translateTwoInputOperator(twoInputOperator); - dataFlowOp.setResource(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources()); + dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources()); } else if (dataSet instanceof BulkIterationResultSet) { BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet; dataFlowOp = translateBulkIteration(bulkIterationResultSet); - dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResources(), + dataFlowOp.setResources(bulkIterationResultSet.getIterationHead().getMinResources(), bulkIterationResultSet.getIterationHead().getPreferredResources()); } else if (dataSet instanceof DeltaIterationResultSet) { DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet; dataFlowOp = translateDeltaIteration(deltaIterationResultSet); - dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResources(), + dataFlowOp.setResources(deltaIterationResultSet.getIterationHead().getMinResources(), deltaIterationResultSet.getIterationHead().getPreferredResources()); } else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) { http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java index 992acc9..3de254b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java @@ -25,6 +25,8 @@ import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.junit.Test; +import java.lang.reflect.Method; + import static org.junit.Assert.assertEquals; public class OperatorTest { @@ -46,19 +48,21 @@ public class OperatorTest { assertEquals(parallelism, operator.getParallelism()); } - /* @Test - public void testConfigurationOfResource() { + public void testConfigurationOfResource() throws Exception{ Operator operator = new MockOperator(); - // verify explicit change in resource - ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0); - ResourceSpec preferredResource = new ResourceSpec(2.0, 200, 0, 0, 0); - operator.setResource(minResource, preferredResource); + Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class); + opMethod.setAccessible(true); + + // verify explicit change in resources + ResourceSpec minResources = new ResourceSpec(1.0, 100); + ResourceSpec preferredResources = new ResourceSpec(2.0, 200); + opMethod.invoke(operator, minResources, preferredResources); - assertEquals(minResource, operator.getMinResource()); - assertEquals(preferredResource, operator.getPreferredResource()); - }*/ + assertEquals(minResources, operator.getMinResources()); + assertEquals(preferredResources, operator.getPreferredResources()); + } private class MockOperator extends Operator { public MockOperator() { http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index bbc944e..a407bfe 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -538,6 +538,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> { } else { container.setName("CHAIN " + containerTaskName + " -> " + chainedTask.getTaskName()); } + + //update resource of container task + container.setResources(container.getMinResources().merge(node.getMinResources()), + container.getPreferredResources().merge(node.getPreferredResources())); this.chainedTasksInSequence.add(chainedTask); return; @@ -837,6 +841,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { } else { // create task vertex vertex = new JobVertex(taskName); + vertex.setResources(node.getMinResources(), node.getPreferredResources()); vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class); config = new TaskConfig(vertex.getConfiguration()); @@ -862,6 +867,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { final DriverStrategy ds = node.getDriverStrategy(); final JobVertex vertex = new JobVertex(taskName); final TaskConfig config = new TaskConfig(vertex.getConfiguration()); + vertex.setResources(node.getMinResources(), node.getPreferredResources()); vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class); // set user code @@ -890,6 +896,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { final InputFormatVertex vertex = new InputFormatVertex(node.getNodeName()); final TaskConfig config = new TaskConfig(vertex.getConfiguration()); + vertex.setResources(node.getMinResources(), node.getPreferredResources()); vertex.setInvokableClass(DataSourceTask.class); vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper())); @@ -905,6 +912,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName()); final TaskConfig config = new TaskConfig(vertex.getConfiguration()); + vertex.setResources(node.getMinResources(), node.getPreferredResources()); vertex.setInvokableClass(DataSinkTask.class); vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper())); @@ -967,6 +975,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // everything else happens in the post visit, after the input (the initial partial solution) // is connected. headVertex = new JobVertex("PartialSolution ("+iteration.getNodeName()+")"); + headVertex.setResources(iteration.getMinResources(), iteration.getPreferredResources()); headVertex.setInvokableClass(IterationHeadTask.class); headConfig = new TaskConfig(headVertex.getConfiguration()); headConfig.setDriver(NoOpDriver.class); @@ -1035,6 +1044,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // everything else happens in the post visit, after the input (the initial partial solution) // is connected. headVertex = new JobVertex("IterationHead("+iteration.getNodeName()+")"); + headVertex.setResources(iteration.getMinResources(), iteration.getPreferredResources()); headVertex.setInvokableClass(IterationHeadTask.class); headConfig = new TaskConfig(headVertex.getConfiguration()); headConfig.setDriver(NoOpDriver.class); @@ -1275,6 +1285,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // --------------------------- create the sync task --------------------------- final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")"); + sync.setResources(bulkNode.getMinResources(), bulkNode.getPreferredResources()); sync.setInvokableClass(IterationSynchronizationSinkTask.class); sync.setParallelism(1); sync.setMaxParallelism(1); @@ -1412,6 +1423,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { final TaskConfig syncConfig; { final JobVertex sync = new JobVertex("Sync (" + iterNode.getNodeName() + ")"); + sync.setResources(iterNode.getMinResources(), iterNode.getPreferredResources()); sync.setInvokableClass(IterationSynchronizationSinkTask.class); sync.setParallelism(1); sync.setMaxParallelism(1); @@ -1468,7 +1480,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { if (hasWorksetTail) { nextWorksetVertex.setInvokableClass(IterationTailTask.class); - + worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java new file mode 100644 index 0000000..d8ee80b --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plantranslate; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.aggregators.LongSumAggregator; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.operators.Operator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.junit.Test; + +import java.lang.reflect.Method; + +import static org.junit.Assert.assertTrue; + +public class JobGraphGeneratorTest { + + /** + * Verifies that the resources are merged correctly for chained operators when + * generating job graph + */ + @Test + public void testResourcesForChainedOperators() throws Exception { + ResourceSpec resource1 = new ResourceSpec(0.1, 100); + ResourceSpec resource2 = new ResourceSpec(0.2, 200); + ResourceSpec resource3 = new ResourceSpec(0.3, 300); + ResourceSpec resource4 = new ResourceSpec(0.4, 400); + ResourceSpec resource5 = new ResourceSpec(0.5, 500); + ResourceSpec resource6 = new ResourceSpec(0.6, 600); + ResourceSpec resource7 = new ResourceSpec(0.7, 700); + + Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class); + opMethod.setAccessible(true); + + Method sinkMethod = DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class); + sinkMethod.setAccessible(true); + + MapFunction<Long, Long> mapFunction = new MapFunction<Long, Long>() { + @Override + public Long map(Long value) throws Exception { + return value; + } + }; + + FilterFunction<Long> filterFunction = new FilterFunction<Long>() { + @Override + public boolean filter(Long value) throws Exception { + return false; + } + }; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> input = env.fromElements(1L, 2L, 3L); + opMethod.invoke(input, resource1); + + DataSet<Long> map1 = input.map(mapFunction); + opMethod.invoke(map1, resource2); + + // CHAIN(Source -> Map -> Filter) + DataSet<Long> filter1 = map1.filter(filterFunction); + opMethod.invoke(filter1, resource3); + + IterativeDataSet<Long> startOfIteration = filter1.iterate(10); + opMethod.invoke(startOfIteration, resource4); + + DataSet<Long> map2 = startOfIteration.map(mapFunction); + opMethod.invoke(map2, resource5); + + // CHAIN(Map -> Filter) + DataSet<Long> feedback = map2.filter(filterFunction); + opMethod.invoke(feedback, resource6); + + DataSink<Long> sink = startOfIteration.closeWith(feedback).output(new DiscardingOutputFormat<Long>()); + sinkMethod.invoke(sink, resource7); + + Plan plan = env.createProgramPlan(); + Optimizer pc = new Optimizer(new Configuration()); + OptimizedPlan op = pc.compile(plan); + + JobGraphGenerator jgg = new JobGraphGenerator(); + JobGraph jobGraph = jgg.compileJobGraph(op); + + JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); + JobVertex iterationHeadVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); + JobVertex feedbackVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(2); + JobVertex sinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(3); + JobVertex iterationSyncVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(4); + + assertTrue(sourceMapFilterVertex.getMinResources().equals(resource1.merge(resource2).merge(resource3))); + assertTrue(iterationHeadVertex.getPreferredResources().equals(resource4)); + assertTrue(feedbackVertex.getMinResources().equals(resource5.merge(resource6))); + assertTrue(sinkVertex.getPreferredResources().equals(resource7)); + assertTrue(iterationSyncVertex.getMinResources().equals(resource4)); + } + + /** + * Verifies that the resources are set onto each job vertex correctly when generating job graph + * which covers the delta iteration case + */ + @Test + public void testResourcesForDeltaIteration() throws Exception{ + ResourceSpec resource1 = new ResourceSpec(0.1, 100); + ResourceSpec resource2 = new ResourceSpec(0.2, 200); + ResourceSpec resource3 = new ResourceSpec(0.3, 300); + ResourceSpec resource4 = new ResourceSpec(0.4, 400); + ResourceSpec resource5 = new ResourceSpec(0.5, 500); + ResourceSpec resource6 = new ResourceSpec(0.6, 600); + + Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class); + opMethod.setAccessible(true); + + Method deltaMethod = DeltaIteration.class.getDeclaredMethod("setResources", ResourceSpec.class); + deltaMethod.setAccessible(true); + + Method sinkMethod = DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class); + sinkMethod.setAccessible(true); + + MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> mapFunction = new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception { + return value; + } + }; + + FilterFunction<Tuple2<Long, Long>> filterFunction = new FilterFunction<Tuple2<Long, Long>>() { + @Override + public boolean filter(Tuple2<Long, Long> value) throws Exception { + return false; + } + }; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<>(1L, 2L)); + opMethod.invoke(input, resource1); + + // CHAIN(Map -> Filter) + DataSet<Tuple2<Long, Long>> map = input.map(mapFunction); + opMethod.invoke(map, resource2); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = map.iterateDelta(map, 100, 0). + registerAggregator("test", new LongSumAggregator()); + deltaMethod.invoke(iteration, resource3); + + DataSet<Tuple2<Long, Long>> delta = iteration.getWorkset().map(mapFunction); + opMethod.invoke(delta, resource4); + + DataSet<Tuple2<Long, Long>> feedback = delta.filter(filterFunction); + opMethod.invoke(feedback, resource5); + + DataSink<Tuple2<Long, Long>> sink = iteration.closeWith(delta, feedback). + output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); + sinkMethod.invoke(sink, resource6); + + Plan plan = env.createProgramPlan(); + Optimizer pc = new Optimizer(new Configuration()); + OptimizedPlan op = pc.compile(plan); + + JobGraphGenerator jgg = new JobGraphGenerator(); + JobGraph jobGraph = jgg.compileJobGraph(op); + + JobVertex sourceMapVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); + JobVertex iterationHeadVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); + JobVertex deltaVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(2); + JobVertex iterationTailVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(3); + JobVertex feedbackVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(4); + JobVertex sinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(5); + JobVertex iterationSyncVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(6); + + assertTrue(sourceMapVertex.getMinResources().equals(resource1.merge(resource2))); + assertTrue(iterationHeadVertex.getPreferredResources().equals(resource3)); + assertTrue(deltaVertex.getMinResources().equals(resource4)); + // the iteration tail task will be scheduled in the same instance with iteration head, and currently not set resources. + assertTrue(iterationTailVertex.getPreferredResources().equals(ResourceSpec.DEFAULT)); + assertTrue(feedbackVertex.getMinResources().equals(resource5)); + assertTrue(sinkVertex.getPreferredResources().equals(resource6)); + assertTrue(iterationSyncVertex.getMinResources().equals(resource3)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 260bd74..1180db4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobgraph; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -31,6 +32,8 @@ import org.apache.flink.util.Preconditions; import java.util.ArrayList; import java.util.List; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * The base class for job vertexes. */ @@ -61,6 +64,12 @@ public class JobVertex implements java.io.Serializable { /** Maximum number of subtasks to split this taks into a runtime. */ private int maxParallelism = -1; + /** The minimum resource of the vertex */ + private ResourceSpec minResources = ResourceSpec.DEFAULT; + + /** The preferred resource of the vertex */ + private ResourceSpec preferredResources = ResourceSpec.DEFAULT; + /** Custom configuration passed to the assigned task at runtime. */ private Configuration configuration; @@ -278,6 +287,35 @@ public class JobVertex implements java.io.Serializable { this.maxParallelism = maxParallelism; } + /** + * Gets the minimum resource for the task. + * + * @return The minimum resource for the task. + */ + public ResourceSpec getMinResources() { + return minResources; + } + + /** + * Gets the preferred resource for the task. + * + * @return The preferred resource for the task. + */ + public ResourceSpec getPreferredResources() { + return preferredResources; + } + + /** + * Sets the minimum and preferred resources for the task. + * + * @param minResources The minimum resource for the task. + * @param preferredResources The preferred resource for the task. + */ + public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + this.minResources = checkNotNull(minResources); + this.preferredResources = checkNotNull(preferredResources); + } + public InputSplitSource<?> getInputSplitSource() { return inputSplitSource; } http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 39d81c6..b0e78d7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -20,9 +20,11 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.util.Preconditions; /** * A Stream Sink. This is used for emitting elements from a streaming topology. @@ -113,45 +115,44 @@ public class DataStreamSink<T> { return this; } -// --------------------------------------------------------------------------- -// Fine-grained resource profiles are an incomplete work-in-progress feature -// The setters are hence commented out at this point. -// --------------------------------------------------------------------------- -// /** -// * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will -// * be considered in resource resize feature for future plan. -// * -// * @param minResources The minimum resources for this sink. -// * @param preferredResources The preferred resources for this sink -// * @return The sink with set minimum and preferred resources. -// */ -// @PublicEvolving -// public DataStreamSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { -// Preconditions.checkNotNull(minResources, "The min resources must be not null."); -// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); -// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), -// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); -// -// transformation.setResources(minResources, preferredResources); -// -// return this; -// } -// -// /** -// * Sets the resource for this sink, the minimum and preferred resources are the same by default. -// * -// * @param resources The resource for this sink. -// * @return The sink with set minimum and preferred resources. -// */ -// @PublicEvolving -// public DataStreamSink<T> setResources(ResourceSpec resources) { -// Preconditions.checkNotNull(resources, "The resource must be not null."); -// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); -// -// transformation.setResources(resources, resources); -// -// return this; -// } + // --------------------------------------------------------------------------- + // Fine-grained resource profiles are an incomplete work-in-progress feature + // The setters are hence private at this point. + // --------------------------------------------------------------------------- + + /** + * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will + * be considered in resource resize feature for future plan. + * + * @param minResources The minimum resources for this sink. + * @param preferredResources The preferred resources for this sink + * @return The sink with set minimum and preferred resources. + */ + private DataStreamSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + Preconditions.checkNotNull(minResources, "The min resources must be not null."); + Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); + Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), + "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources."); + + transformation.setResources(minResources, preferredResources); + + return this; + } + + /** + * Sets the resources for this sink, the minimum and preferred resources are the same by default. + * + * @param resources The resources for this sink. + * @return The sink with set minimum and preferred resources. + */ + private DataStreamSink<T> setResources(ResourceSpec resources) { + Preconditions.checkNotNull(resources, "The resources must be not null."); + Preconditions.checkArgument(resources.isValid(), "The values in resources must be not less than 0."); + + transformation.setResources(resources, resources); + + return this; + } /** * Turns off chaining for this operator so thread co-location will not be http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 859c6d5..f1d5e3a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -154,45 +155,44 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { return this; } -// --------------------------------------------------------------------------- -// Fine-grained resource profiles are an incomplete work-in-progress feature -// The setters are hence commented out at this point. -// --------------------------------------------------------------------------- -// /** -// * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will -// * be considered in dynamic resource resize feature for future plan. -// * -// * @param minResources The minimum resources for this operator. -// * @param preferredResources The preferred resources for this operator. -// * @return The operator with set minimum and preferred resources. -// */ -// @PublicEvolving -// public SingleOutputStreamOperator<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { -// Preconditions.checkNotNull(minResources, "The min resources must be not null."); -// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); -// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResource.lessThanOrEqual(preferredResources), -// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); -// -// transformation.setResources(minResources, preferredResources); -// -// return this; -// } -// -// /** -// * Sets the resources for this operator, the minimum and preferred resources are the same by default. -// * -// * @param resources The resources for this operator. -// * @return The operator with set minimum and preferred resources. -// */ -// @PublicEvolving -// public SingleOutputStreamOperator<T> setResources(ResourceSpec resources) { -// Preconditions.checkNotNull(resources, "The resource must be not null."); -// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); -// -// transformation.setResources(resources, resources); -// -// return this; -// } + // --------------------------------------------------------------------------- + // Fine-grained resource profiles are an incomplete work-in-progress feature + // The setters are hence private at this point. + // --------------------------------------------------------------------------- + + /** + * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will + * be considered in dynamic resource resize feature for future plan. + * + * @param minResources The minimum resources for this operator. + * @param preferredResources The preferred resources for this operator. + * @return The operator with set minimum and preferred resources. + */ + private SingleOutputStreamOperator<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + Preconditions.checkNotNull(minResources, "The min resources must be not null."); + Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); + Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), + "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources."); + + transformation.setResources(minResources, preferredResources); + + return this; + } + + /** + * Sets the resources for this operator, the minimum and preferred resources are the same by default. + * + * @param resources The resources for this operator. + * @return The operator with set minimum and preferred resources. + */ + private SingleOutputStreamOperator<T> setResources(ResourceSpec resources) { + Preconditions.checkNotNull(resources, "The resources must be not null."); + Preconditions.checkArgument(resources.isValid(), "The values in resources must be not less than 0."); + + transformation.setResources(resources, resources); + + return this; + } private boolean canBeParallel() { return !nonParallel; http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index a87e63d..c1775e4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -414,9 +414,9 @@ public class StreamGraph extends StreamingPlan { } } - public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) { + public void setResources(int vertexID, ResourceSpec minResources, ResourceSpec preferredResources) { if (getStreamNode(vertexID) != null) { - getStreamNode(vertexID).setResources(minResource, preferredResource); + getStreamNode(vertexID).setResources(minResources, preferredResources); } } @@ -477,6 +477,7 @@ public class StreamGraph extends StreamingPlan { StreamNode node = streamNodes.get(nodeId); if (node != null) { node.setUserHash(nodeHash); + } } @@ -540,7 +541,9 @@ public class StreamGraph extends StreamingPlan { int sinkId, long timeout, int parallelism, - int maxParallelism) { + int maxParallelism, + ResourceSpec minResources, + ResourceSpec preferredResources) { StreamNode source = this.addNode(sourceId, null, StreamIterationHead.class, @@ -549,6 +552,7 @@ public class StreamGraph extends StreamingPlan { sources.add(source.getId()); setParallelism(source.getId(), parallelism); setMaxParallelism(source.getId(), maxParallelism); + setResources(source.getId(), minResources, preferredResources); StreamNode sink = this.addNode(sinkId, null, http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index de87a66..f4d4071 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -203,7 +203,7 @@ public class StreamGraphGenerator { } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { - streamGraph.setResource(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); + streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; @@ -335,7 +335,9 @@ public class StreamGraphGenerator { getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), - iterate.getMaxParallelism()); + iterate.getMaxParallelism(), + iterate.getMinResources(), + iterate.getPreferredResources() ); StreamNode itSource = itSourceAndSink.f0; StreamNode itSink = itSourceAndSink.f1; @@ -400,7 +402,9 @@ public class StreamGraphGenerator { getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), - coIterate.getMaxParallelism()); + coIterate.getMaxParallelism(), + coIterate.getMinResources(), + coIterate.getPreferredResources()); StreamNode itSource = itSourceAndSink.f0; StreamNode itSink = itSourceAndSink.f1; http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 2d2e1e75..e4164ba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -49,8 +49,8 @@ public class StreamNode implements Serializable { * dynamic scaling and the number of key groups used for partitioned state. */ private int maxParallelism; - private ResourceSpec minResources; - private ResourceSpec preferredResources; + private ResourceSpec minResources = ResourceSpec.DEFAULT; + private ResourceSpec preferredResources = ResourceSpec.DEFAULT; private Long bufferTimeout = null; private final String operatorName; private String slotSharingGroup; http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index c18b527..5c1e1ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; @@ -85,6 +86,9 @@ public class StreamingJobGraphGenerator { private Map<Integer, StreamConfig> vertexConfigs; private Map<Integer, String> chainedNames; + private Map<Integer, ResourceSpec> chainedMinResources; + private Map<Integer, ResourceSpec> chainedPreferredResources; + private final StreamGraphHasher defaultStreamGraphHasher; private final List<StreamGraphHasher> legacyStreamGraphHashers; @@ -100,6 +104,8 @@ public class StreamingJobGraphGenerator { this.chainedConfigs = new HashMap<>(); this.vertexConfigs = new HashMap<>(); this.chainedNames = new HashMap<>(); + this.chainedMinResources = new HashMap<>(); + this.chainedPreferredResources = new HashMap<>(); this.physicalEdgesInOrder = new ArrayList<>(); } @@ -211,6 +217,8 @@ public class StreamingJobGraphGenerator { } chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); + chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); + chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes) @@ -269,6 +277,22 @@ public class StreamingJobGraphGenerator { } } + private ResourceSpec createChainedMinResources(Integer vertexID, List<StreamEdge> chainedOutputs) { + ResourceSpec minResources = streamGraph.getStreamNode(vertexID).getMinResources(); + for (StreamEdge chainable : chainedOutputs) { + minResources = minResources.merge(chainedMinResources.get(chainable.getTargetId())); + } + return minResources; + } + + private ResourceSpec createChainedPreferredResources(Integer vertexID, List<StreamEdge> chainedOutputs) { + ResourceSpec preferredResources = streamGraph.getStreamNode(vertexID).getPreferredResources(); + for (StreamEdge chainable : chainedOutputs) { + preferredResources = preferredResources.merge(chainedPreferredResources.get(chainable.getTargetId())); + } + return preferredResources; + } + private StreamConfig createJobVertex( Integer streamNodeId, Map<Integer, byte[]> hashes, @@ -308,6 +332,8 @@ public class StreamingJobGraphGenerator { legacyJobVertexIds); } + jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId)); + jobVertex.setInvokableClass(streamNode.getJobVertexClass()); int parallelism = streamNode.getParallelism(); http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index 24b5736..e86b3e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -132,13 +132,13 @@ public abstract class StreamTransformation<T> { * The minimum resources for this stream transformation. It defines the lower limit for * dynamic resources resize in future plan. */ - private ResourceSpec minResources = ResourceSpec.UNKNOWN; + private ResourceSpec minResources = ResourceSpec.DEFAULT; /** * The preferred resources for this stream transformation. It defines the upper limit for * dynamic resource resize in future plan. */ - private ResourceSpec preferredResources = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResources = ResourceSpec.DEFAULT; /** * User-specified ID for this transformation. This is used to assign the http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index b4d2421..5660655 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api; +import java.lang.reflect.Method; import java.util.List; import org.apache.flink.api.common.InvalidProgramException; @@ -26,6 +27,7 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -516,11 +518,10 @@ public class DataStreamTest { } /** - * Tests whether resource gets set. + * Tests whether resources get set. */ - /* @Test - public void testResource() { + public void testResources() throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ResourceSpec minResource1 = new ResourceSpec(1.0, 100); @@ -542,23 +543,35 @@ public class DataStreamTest { ResourceSpec preferredResource6 = new ResourceSpec(2.0, 700); ResourceSpec minResource7 = new ResourceSpec(1.0, 700); - ResourceSpec maxResource7 = new ResourceSpec(2.0, 800); + ResourceSpec preferredResource7 = new ResourceSpec(2.0, 800); + + Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class); + opMethod.setAccessible(true); + + Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class); + sinkMethod.setAccessible(true); + + DataStream<Long> source1 = env.generateSequence(0, 0); + opMethod.invoke(source1, minResource1, preferredResource1); - DataStream<Long> source1 = env.generateSequence(0, 0).setResource(minResource1, preferredResource1); DataStream<Long> map1 = source1.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return null; } - }).setResource(minResource2, preferredResource2); + }); + opMethod.invoke(map1, minResource2, preferredResource2); + + DataStream<Long> source2 = env.generateSequence(0, 0); + opMethod.invoke(source2, minResource3, preferredResource3); - DataStream<Long> source2 = env.generateSequence(0, 0).setResource(minResource3, preferredResource3); DataStream<Long> map2 = source2.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return null; } - }).setResource(minResource4, preferredResource4); + }); + opMethod.invoke(map2, minResource4, preferredResource4); DataStream<Long> connected = map1.connect(map2) .flatMap(new CoFlatMapFunction<Long, Long, Long>() { @@ -568,7 +581,8 @@ public class DataStreamTest { @Override public void flatMap2(Long value, Collector<Long> out) throws Exception { } - }).setResource(minResource5, preferredResource5); + }); + opMethod.invoke(connected, minResource5, preferredResource5); DataStream<Long> windowed = connected .windowAll(GlobalWindows.create()) @@ -580,31 +594,33 @@ public class DataStreamTest { public Long fold(Long accumulator, Long value) throws Exception { return null; } - }).setResource(minResource6, preferredResource6); + }); + opMethod.invoke(windowed, minResource6, preferredResource6); - DataStreamSink<Long> sink = windowed.print().setResource(minResource7, maxResource7); + DataStreamSink<Long> sink = windowed.print(); + sinkMethod.invoke(sink, minResource7, preferredResource7); - assertEquals(minResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMinResource()); - assertEquals(preferredResource1, env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResource()); + assertEquals(minResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMinResources()); + assertEquals(preferredResource1, env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResources()); - assertEquals(minResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMinResource()); - assertEquals(preferredResource2, env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResource()); + assertEquals(minResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMinResources()); + assertEquals(preferredResource2, env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResources()); - assertEquals(minResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMinResource()); - assertEquals(preferredResource3, env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResource()); + assertEquals(minResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMinResources()); + assertEquals(preferredResource3, env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResources()); - assertEquals(minResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMinResource()); - assertEquals(preferredResource4, env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResource()); + assertEquals(minResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMinResources()); + assertEquals(preferredResource4, env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResources()); - assertEquals(minResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMinResource()); - assertEquals(preferredResource5, env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResource()); + assertEquals(minResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMinResources()); + assertEquals(preferredResource5, env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResources()); - assertEquals(minResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMinResource()); - assertEquals(preferredResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResource()); + assertEquals(minResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMinResources()); + assertEquals(preferredResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResources()); - assertEquals(minResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResource()); - assertEquals(maxResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResource()); - }*/ + assertEquals(minResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResources()); + assertEquals(preferredResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResources()); + } @Test public void testTypeInfo() { http://git-wip-us.apache.org/repos/asf/flink/blob/980d072f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 5f1973c..6d2fcaa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -17,19 +17,29 @@ package org.apache.flink.streaming.api.graph; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.IterativeStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.lang.reflect.Method; import java.util.List; import java.util.Map; @@ -139,7 +149,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration()); StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration()); Map<Integer, StreamConfig> chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader()); - StreamConfig printConfig = chainedConfigs.get(3); + StreamConfig printConfig = chainedConfigs.values().iterator().next(); assertTrue(sourceConfig.isChainStart()); assertTrue(sourceConfig.isChainEnd()); @@ -150,4 +160,151 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertFalse(printConfig.isChainStart()); assertTrue(printConfig.isChainEnd()); } + + /** + * Verifies that the resources are merged correctly for chained operators (covers source and sink cases) + * when generating job graph + */ + @Test + public void testResourcesForChainedSourceSink() throws Exception { + ResourceSpec resource1 = new ResourceSpec(0.1, 100); + ResourceSpec resource2 = new ResourceSpec(0.2, 200); + ResourceSpec resource3 = new ResourceSpec(0.3, 300); + ResourceSpec resource4 = new ResourceSpec(0.4, 400); + ResourceSpec resource5 = new ResourceSpec(0.5, 500); + + Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class); + opMethod.setAccessible(true); + + Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class); + sinkMethod.setAccessible(true); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<Integer, Integer>> source = env.addSource(new ParallelSourceFunction<Tuple2<Integer, Integer>>() { + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + } + + @Override + public void cancel() { + } + }); + opMethod.invoke(source, resource1); + + DataStream<Tuple2<Integer, Integer>> map = source.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { + @Override + public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { + return value; + } + }); + opMethod.invoke(map, resource2); + + // CHAIN(Source -> Map -> Filter) + DataStream<Tuple2<Integer, Integer>> filter = map.filter(new FilterFunction<Tuple2<Integer, Integer>>() { + @Override + public boolean filter(Tuple2<Integer, Integer> value) throws Exception { + return false; + } + }); + opMethod.invoke(filter, resource3); + + DataStream<Tuple2<Integer, Integer>> reduce = filter.keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { + @Override + public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { + return new Tuple2<>(value1.f0, value1.f1 + value2.f1); + } + }); + opMethod.invoke(reduce, resource4); + + DataStreamSink<Tuple2<Integer, Integer>> sink = reduce.addSink(new SinkFunction<Tuple2<Integer, Integer>>() { + @Override + public void invoke(Tuple2<Integer, Integer> value) throws Exception { + } + }); + sinkMethod.invoke(sink, resource5); + + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); + + JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); + JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); + + assertTrue(sourceMapFilterVertex.getMinResources().equals(resource1.merge(resource2).merge(resource3))); + assertTrue(reduceSinkVertex.getPreferredResources().equals(resource4.merge(resource5))); + } + + /** + * Verifies that the resources are merged correctly for chained operators (covers middle chaining and iteration cases) + * when generating job graph + */ + @Test + public void testResourcesForIteration() throws Exception { + ResourceSpec resource1 = new ResourceSpec(0.1, 100); + ResourceSpec resource2 = new ResourceSpec(0.2, 200); + ResourceSpec resource3 = new ResourceSpec(0.3, 300); + ResourceSpec resource4 = new ResourceSpec(0.4, 400); + ResourceSpec resource5 = new ResourceSpec(0.5, 500); + + Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class); + opMethod.setAccessible(true); + + Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class); + sinkMethod.setAccessible(true); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Integer> source = env.addSource(new ParallelSourceFunction<Integer>() { + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + } + + @Override + public void cancel() { + } + }).name("test_source"); + opMethod.invoke(source, resource1); + + IterativeStream<Integer> iteration = source.iterate(3000); + opMethod.invoke(iteration, resource2); + + DataStream<Integer> flatMap = iteration.flatMap(new FlatMapFunction<Integer, Integer>() { + @Override + public void flatMap(Integer value, Collector<Integer> out) throws Exception { + out.collect(value); + } + }).name("test_flatMap"); + opMethod.invoke(flatMap, resource3); + + // CHAIN(flatMap -> Filter) + DataStream<Integer> increment = flatMap.filter(new FilterFunction<Integer>() { + @Override + public boolean filter(Integer value) throws Exception { + return false; + } + }).name("test_filter"); + opMethod.invoke(increment, resource4); + + DataStreamSink<Integer> sink = iteration.closeWith(increment).addSink(new SinkFunction<Integer>() { + @Override + public void invoke(Integer value) throws Exception { + } + }).disableChaining().name("test_sink"); + sinkMethod.invoke(sink, resource5); + + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); + + for (JobVertex jobVertex : jobGraph.getVertices()) { + if (jobVertex.getName().contains("test_source")) { + assertTrue(jobVertex.getMinResources().equals(resource1)); + } else if (jobVertex.getName().contains("Iteration_Source")) { + assertTrue(jobVertex.getPreferredResources().equals(resource2)); + } else if (jobVertex.getName().contains("test_flatMap")) { + assertTrue(jobVertex.getMinResources().equals(resource3.merge(resource4))); + } else if (jobVertex.getName().contains("Iteration_Tail")) { + assertTrue(jobVertex.getPreferredResources().equals(ResourceSpec.DEFAULT)); + } else if (jobVertex.getName().contains("test_sink")) { + assertTrue(jobVertex.getMinResources().equals(resource5)); + } + } + } }