[FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2429a7b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2429a7b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2429a7b1

Branch: refs/heads/release-1.5
Commit: 2429a7b1acc1efa99960ccef6623ba9e4141c0e5
Parents: ea54384
Author: zhouhai02 <zhouha...@meituan.com>
Authored: Wed Mar 21 00:01:54 2018 +0800
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 25 09:33:24 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/client/LocalExecutor.java  |  7 ++++---
 .../flink/client/deployment/ClusterSpecification.java     |  3 +--
 .../kinesis/manualtests/ManualExactlyOnceTest.java        |  2 +-
 .../ManualExactlyOnceWithStreamReshardingTest.java        |  2 +-
 .../org/apache/flink/storm/api/FlinkLocalCluster.java     |  3 +--
 .../flink/runtime/clusterframework/BootstrapTools.java    |  2 +-
 .../runtime/minicluster/MiniClusterConfiguration.java     |  3 ++-
 .../runtime/taskexecutor/TaskManagerConfiguration.java    |  2 +-
 .../taskexecutor/TaskManagerServicesConfiguration.java    |  4 ++--
 .../flink/runtime/checkpoint/CoordinatorShutdownTest.java |  5 +++--
 .../flink/runtime/jobmanager/JobManagerCleanupITCase.java |  3 ++-
 .../runtime/jobmanager/JobManagerHARecoveryTest.java      |  4 ++--
 .../apache/flink/runtime/jobmanager/JobManagerTest.java   |  5 ++---
 .../leaderelection/LeaderChangeJobRecoveryTest.java       |  3 ++-
 .../leaderelection/LeaderChangeStateCleanupTest.java      |  3 ++-
 .../backpressure/BackPressureStatsTrackerImplITCase.java  |  4 ++--
 .../backpressure/StackTraceSampleCoordinatorITCase.java   |  4 ++--
 .../org/apache/flink/runtime/akka/AkkaSslITCase.scala     | 10 +++++-----
 .../apache/flink/runtime/jobmanager/RecoveryITCase.scala  |  4 ++--
 .../apache/flink/runtime/testingUtils/TestingUtils.scala  |  2 +-
 .../org/apache/flink/api/scala/ScalaShellITCase.scala     |  4 ++--
 .../org/apache/flink/test/util/MiniClusterResource.java   |  2 +-
 .../java/org/apache/flink/test/util/TestBaseUtils.java    |  2 +-
 .../checkpointing/utils/SavepointMigrationTestBase.java   |  3 ++-
 .../flink/test/operators/ExecutionEnvironmentITCase.java  |  4 ++--
 .../flink/test/operators/RemoteEnvironmentITCase.java     |  4 ++--
 .../AbstractTaskManagerProcessFailureRecoveryTest.java    |  2 +-
 .../recovery/JobManagerHACheckpointRecoveryITCase.java    |  3 ++-
 .../JobManagerHAProcessFailureBatchRecoveryITCase.java    |  3 +--
 .../test/recovery/TaskManagerFailureRecoveryITCase.java   |  2 +-
 .../leaderelection/ZooKeeperLeaderElectionITCase.java     |  3 ++-
 .../runtime/minicluster/LocalFlinkMiniClusterITCase.java  |  3 ++-
 .../scala/runtime/jobmanager/JobManagerFailsITCase.scala  |  2 +-
 .../runtime/taskmanager/TaskManagerFailsITCase.scala      |  5 ++---
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  3 +--
 35 files changed, 62 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 01c281f..f837c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -139,7 +140,7 @@ public class LocalExecutor extends PlanExecutor {
                                
.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
                                .setNumSlotsPerTaskManager(
                                        configuration.getInteger(
-                                               
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1))
+                                               
TaskManagerOptions.NUM_TASK_SLOTS, 1))
                                .build();
 
                        final MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration);
@@ -220,7 +221,7 @@ public class LocalExecutor extends PlanExecutor {
 
                        try {
                                // TODO: Set job's default parallelism to max 
number of slots
-                               final int slotsPerTaskManager = 
jobExecutorServiceConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
 taskManagerNumSlots);
+                               final int slotsPerTaskManager = 
jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
taskManagerNumSlots);
                                final int numTaskManagers = 
jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
 1);
                                plan.setDefaultParallelism(slotsPerTaskManager 
* numTaskManagers);
 
@@ -265,7 +266,7 @@ public class LocalExecutor extends PlanExecutor {
 
        private Configuration createConfiguration() {
                Configuration newConfiguration = new Configuration();
-               
newConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
getTaskManagerNumSlots());
+               newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
getTaskManagerNumSlots());
                
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, 
isDefaultOverwriteFiles());
 
                newConfiguration.addAll(baseConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 8650cab..cf2ae4c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.deployment;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -66,7 +65,7 @@ public final class ClusterSpecification {
        }
 
        public static ClusterSpecification fromConfiguration(Configuration 
configuration) {
-               int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+               int slots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
                int jobManagerMemoryMb = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
                int taskManagerMemoryMb = 
configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 963002f..40225fb 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -78,7 +78,7 @@ public class ManualExactlyOnceTest {
 
                final Configuration flinkConfig = new Configuration();
                
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
                flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 93b9caf..34dcdc0 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -90,7 +90,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 
                final Configuration flinkConfig = new Configuration();
                
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
                flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index bff8c80..6b0b503 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.storm.api;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -90,7 +89,7 @@ public class FlinkLocalCluster {
                        configuration.addAll(jobGraph.getJobConfiguration());
 
                        
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
-                       
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism());
+                       
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism());
 
                        this.flink = new LocalFlinkMiniCluster(configuration, 
true);
                        this.flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 102274d1..7a8403a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -247,7 +247,7 @@ public class BootstrapTools {
 
                cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, 
registrationTimeout.toString());
                if (numSlots != -1){
-                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+                       cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlots);
                }
 
                return cfg; 

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 44a567b..0a0c692 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
@@ -167,7 +168,7 @@ public class MiniClusterConfiguration {
 
                public MiniClusterConfiguration build() {
                        final Configuration modifiedConfiguration = new 
Configuration(configuration);
-                       
modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
+                       
modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
                        modifiedConfiguration.setString(
                                RestOptions.ADDRESS,
                                
modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1bf42ee..e8a7ae8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -162,7 +162,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
        // 
--------------------------------------------------------------------------------------------
 
        public static TaskManagerConfiguration fromConfiguration(Configuration 
configuration) {
-               int numberSlots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+               int numberSlots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
                if (numberSlots == -1) {
                        numberSlots = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index d029bc5..b80320c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -196,7 +196,7 @@ public class TaskManagerServicesConfiguration {
                        boolean localCommunication) throws Exception {
 
                // we need this because many configs have been written with a 
"-1" entry
-               int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+               int slots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
                if (slots == -1) {
                        slots = 1;
                }
@@ -290,7 +290,7 @@ public class TaskManagerServicesConfiguration {
                checkConfigParameter(dataport >= 0, dataport, 
TaskManagerOptions.DATA_PORT.key(),
                        "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
 
-               checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+               checkConfigParameter(slots >= 1, slots, 
TaskManagerOptions.NUM_TASK_SLOTS.key(),
                        "Number of task slots must be at least one.");
 
                final int pageSize = 
configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 1d44444..8a6a9d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.execution.Environment;
@@ -59,7 +60,7 @@ public class CoordinatorShutdownTest extends TestLogger {
                try {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
                        cluster = new LocalFlinkMiniCluster(config, true);
                        cluster.start();
                        
@@ -128,7 +129,7 @@ public class CoordinatorShutdownTest extends TestLogger {
                try {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
                        cluster = new LocalFlinkMiniCluster(config, true);
                        cluster.start();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
index 8806dec..33a4a28 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -146,7 +147,7 @@ public class JobManagerCleanupITCase extends TestLogger {
 
                                        try {
                                                Configuration config = new 
Configuration();
-                                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+                                               
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
                                                
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                                                
config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
                                                
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
blobBaseDir.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index bb2dbf7..d991983 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -174,7 +174,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
                flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
                
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
-               
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
+               
flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
                flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 
3_600L);
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 417294c..79e6d20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -641,7 +640,7 @@ public class JobManagerTest extends TestLogger {
 
                Configuration tmConfig = new Configuration();
                tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-               
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 
                ActorRef taskManager = 
TaskManager.startTaskManagerComponentsAndActor(
                        tmConfig,
@@ -1300,7 +1299,7 @@ public class JobManagerTest extends TestLogger {
                        archiver = new AkkaActorGateway(master._2(), leaderId);
 
                        Configuration tmConfig = new Configuration();
-                       
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+                       tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
4);
 
                        ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
                                tmConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 942fcf3..2ebaeba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -72,7 +73,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
+               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlotsPerTM);
 
                configuration.setString(ConfigConstants.RESTART_STRATEGY, 
"fixeddelay");
                
configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
9999);

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index bbcbbf0..6880d9b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -78,7 +79,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
+               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlotsPerTM);
 
                highAvailabilityServices = new 
TestingManualHighAvailabilityServices();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 994d02e..0e837a5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -138,7 +138,7 @@ public class BackPressureStatsTrackerImplITCase extends 
TestLogger {
                                        config,
                                        highAvailabilityServices);
 
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+                               
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 
                                taskManager = TestingUtils.createTaskManager(
                                        testActorSystem,

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
index 8fa302a..ccf5f60 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -109,7 +109,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                                        config,
                                        highAvailabilityServices);
 
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+                               
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 
                                taskManager = TestingUtils.createTaskManager(
                                        testActorSystem,

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 72596cd..ebdae31 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -51,7 +51,7 @@ class AkkaSslITCase(_system: ActorSystem)
       val config = new Configuration()
       config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
       config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
       config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -78,7 +78,7 @@ class AkkaSslITCase(_system: ActorSystem)
         val config = new Configuration()
         config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
         config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 
"127.0.0.1")
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
         config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -101,7 +101,7 @@ class AkkaSslITCase(_system: ActorSystem)
     "start with akka ssl disabled" in {
 
       val config = new Configuration()
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
       config.setBoolean(SecurityOptions.SSL_ENABLED, false)
 
@@ -117,7 +117,7 @@ class AkkaSslITCase(_system: ActorSystem)
       an[Exception] should be thrownBy {
 
         val config = new Configuration()
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 
@@ -139,7 +139,7 @@ class AkkaSslITCase(_system: ActorSystem)
       an[Exception] should be thrownBy {
 
         val config = new Configuration()
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 71d2ee9..a2dffc8 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration, TaskManagerOptions}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, 
JobStatus, JobVertex}
@@ -59,7 +59,7 @@ class RecoveryITCase(_system: ActorSystem)
       heartbeatTimeout: String)
     : TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers)
     config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
     new TestingCluster(config)

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2d8d02d..b89d2a6 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -88,7 +88,7 @@ object TestingUtils {
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
                           timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): 
TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
     config.setString(AkkaOptions.ASK_TIMEOUT, timeout)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index cb6231a..2e07fb9 100644
--- 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,8 +22,8 @@ import java.io._
 
 import akka.actor.ActorRef
 import akka.pattern.Patterns
+import org.apache.flink.configuration.{Configuration, CoreOptions, 
TaskManagerOptions}
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
CoreOptions, GlobalConfiguration}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
@@ -321,7 +321,7 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
parallelism)
+    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
     configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
 
     cluster = Option(new StandaloneMiniCluster(configuration))

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8c21b37..844de01 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -198,7 +198,7 @@ public class MiniClusterResource extends ExternalResource {
        private void startLegacyMiniCluster() throws Exception {
                final Configuration configuration = new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
miniClusterResourceConfiguration.getNumberTaskManagers());
-               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
+               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
 
                final LocalFlinkMiniCluster flinkMiniCluster = 
TestBaseUtils.startCluster(
                        configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index dd255fd..7e9b12e 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -127,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
                Configuration config = new Configuration();
 
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
taskManagerNumSlots);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
taskManagerNumSlots);
 
                config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, 
startWebserver);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 8f2aaa1..cfa155b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -99,7 +100,7 @@ public abstract class SavepointMigrationTestBase extends 
TestBaseUtils {
                final Configuration config = new Configuration();
 
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
 
                UUID id = UUID.randomUUID();
                final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" 
+ id).getAbsoluteFile();

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
index f7f993b..65a21d1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -50,7 +50,7 @@ public class ExecutionEnvironmentITCase extends TestLogger {
        @Test
        public void testLocalEnvironmentWithConfig() throws Exception {
                Configuration conf = new Configuration();
-               conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM);
+               conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(conf);
                env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index a3a551c..c2d6341 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -96,7 +96,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
 
                        resource = miniCluster;
                } else {
-                       
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+                       
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
                        final StandaloneMiniCluster standaloneMiniCluster = new 
StandaloneMiniCluster(configuration);
                        hostname = standaloneMiniCluster.getHostname();
                        port = standaloneMiniCluster.getPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 29516dc..6764f6f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -406,7 +406,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                cfg.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
                                
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
                                
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+                               
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
                                cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
 
                                
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 16ea6d5..ce750d3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -365,7 +366,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                        
config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 
retainedCheckpoints);
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlots);
                        
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
temporaryFolder.newFolder().toString());
 
                        String tmpFolderString = 
temporaryFolder.newFolder().toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d217a2a..50404a4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -270,7 +269,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                        // Task manager configuration
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
                        
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 
                        highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                                config,

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 8371230..755b3d3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -77,7 +77,7 @@ public class TaskManagerFailureRecoveryITCase extends 
TestLogger {
                try {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
parallelism);
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
 
                        config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, 
"500 ms");

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index f348b8a..399fc10 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -157,7 +158,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
+               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlotsPerTM);
 
                // we "effectively" disable the automatic RecoverAllJobs 
message and sent it manually to make
                // sure that all TMs have registered to the JM prior to issuing 
the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 770b88c..b3d92df 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -76,7 +77,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
                try {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlots);
                        miniCluster = new LocalFlinkMiniCluster(config, true);
 
                        miniCluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5fe7b1d..5c9b1fb 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -133,7 +133,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
   def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
     config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index a065e5b..69ad2d7 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,8 +20,7 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
TaskManagerOptions}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
@@ -243,7 +242,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
   def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskmanagers)
 
     new TestingCluster(config, singleActorSystem = false)

http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fe04662..7596d68 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -500,7 +499,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                }
 
                if (commandLine.hasOption(slots.getOpt())) {
-                       
effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
+                       
effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
                }
 
                if (isYarnPropertiesFileMode(commandLine)) {

Reply via email to