Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig"
This reverts commit e4fbae36207c563363eed39886c24eea51d7db01. The fixes around FLINK-5808 introduced follow-up issues. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a13750cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a13750cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a13750cc Branch: refs/heads/master Commit: a13750ccf7bf2fc8c15746d891d8d6959ea073ad Parents: 5b4dd41 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Apr 4 14:03:26 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Apr 18 17:42:10 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/api/common/ExecutionConfig.java | 9 --------- .../runtime/executiongraph/ExecutionJobVertex.java | 6 +++--- .../runtime/executiongraph/ExecutionVertex.java | 4 ++-- .../runtime/state/KeyGroupRangeAssignment.java | 16 ++++++++++++---- .../api/environment/StreamExecutionEnvironment.java | 5 +++-- .../streaming/api/graph/StreamGraphGenerator.java | 6 +++--- 6 files changed, 23 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a13750cc/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 9af9cff..26e6af1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -83,15 +83,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut */ public static final int PARALLELISM_UNKNOWN = -2; - /** - * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users - * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. - */ - public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; - - /** The (inclusive) upper bound for max parallelism */ - public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15; - private static final long DEFAULT_RESTART_DELAY = 10000L; // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a13750cc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index c9b25bf..5fbce4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -225,13 +225,13 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable private void setMaxParallelismInternal(int maxParallelism) { if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { - maxParallelism = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM; + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; } Preconditions.checkArgument(maxParallelism > 0 - && maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, "Overriding max parallelism is not in valid bounds (1..%s), found: %s", - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, maxParallelism); + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, maxParallelism); this.maxParallelism = maxParallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/a13750cc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 90820e9..3f6ce88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.Archiveable; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; @@ -41,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; @@ -671,7 +671,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs! producedPartitions.add(ResultPartitionDeploymentDescriptor.from( partition, - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, lazyScheduling)); } else { Preconditions.checkState(1 == consumers.size(), http://git-wip-us.apache.org/repos/asf/flink/blob/a13750cc/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java index bf0611b..62bf3f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java @@ -18,12 +18,20 @@ package org.apache.flink.runtime.state; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; public final class KeyGroupRangeAssignment { + /** + * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users + * some degree of scale-up in case they forgot to configure maximum parallelism explicitly. + */ + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7; + + /** The (inclusive) upper bound for max parallelism */ + public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15; + private KeyGroupRangeAssignment() { throw new AssertionError(); } @@ -122,13 +130,13 @@ public final class KeyGroupRangeAssignment { return Math.min( Math.max( MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)), - ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM), - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM); + DEFAULT_LOWER_BOUND_MAX_PARALLELISM), + UPPER_BOUND_MAX_PARALLELISM); } public static void checkParallelismPreconditions(int parallelism) { Preconditions.checkArgument(parallelism > 0 - && parallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + && parallelism <= UPPER_BOUND_MAX_PARALLELISM, "Operator parallelism not within bounds: " + parallelism); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a13750cc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 1801315..b6540dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -49,6 +49,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -185,9 +186,9 @@ public abstract class StreamExecutionEnvironment { */ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0 && - maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, + maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, "maxParallelism is out of bounds 0 < maxParallelism <= " + - ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); config.setMaxParallelism(maxParallelism); return this; http://git-wip-us.apache.org/repos/asf/flink/blob/a13750cc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index df10ae4..9fc8dd8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation; @@ -79,8 +79,8 @@ public class StreamGraphGenerator { private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class); - public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; - public static final int UPPER_BOUND_MAX_PARALLELISM = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM; + public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; + public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; // The StreamGraph that is being built, this is initialized at the beginning. private final StreamGraph streamGraph;