[FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2429a7b1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2429a7b1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2429a7b1 Branch: refs/heads/release-1.5 Commit: 2429a7b1acc1efa99960ccef6623ba9e4141c0e5 Parents: ea54384 Author: zhouhai02 <zhouha...@meituan.com> Authored: Wed Mar 21 00:01:54 2018 +0800 Committer: zentol <ches...@apache.org> Committed: Wed Apr 25 09:33:24 2018 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/flink/client/LocalExecutor.java | 7 ++++--- .../flink/client/deployment/ClusterSpecification.java | 3 +-- .../kinesis/manualtests/ManualExactlyOnceTest.java | 2 +- .../ManualExactlyOnceWithStreamReshardingTest.java | 2 +- .../org/apache/flink/storm/api/FlinkLocalCluster.java | 3 +-- .../flink/runtime/clusterframework/BootstrapTools.java | 2 +- .../runtime/minicluster/MiniClusterConfiguration.java | 3 ++- .../runtime/taskexecutor/TaskManagerConfiguration.java | 2 +- .../taskexecutor/TaskManagerServicesConfiguration.java | 4 ++-- .../flink/runtime/checkpoint/CoordinatorShutdownTest.java | 5 +++-- .../flink/runtime/jobmanager/JobManagerCleanupITCase.java | 3 ++- .../runtime/jobmanager/JobManagerHARecoveryTest.java | 4 ++-- .../apache/flink/runtime/jobmanager/JobManagerTest.java | 5 ++--- .../leaderelection/LeaderChangeJobRecoveryTest.java | 3 ++- .../leaderelection/LeaderChangeStateCleanupTest.java | 3 ++- .../backpressure/BackPressureStatsTrackerImplITCase.java | 4 ++-- .../backpressure/StackTraceSampleCoordinatorITCase.java | 4 ++-- .../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 10 +++++----- .../apache/flink/runtime/jobmanager/RecoveryITCase.scala | 4 ++-- .../apache/flink/runtime/testingUtils/TestingUtils.scala | 2 +- .../org/apache/flink/api/scala/ScalaShellITCase.scala | 4 ++-- .../org/apache/flink/test/util/MiniClusterResource.java | 2 +- .../java/org/apache/flink/test/util/TestBaseUtils.java | 2 +- .../checkpointing/utils/SavepointMigrationTestBase.java | 3 ++- .../flink/test/operators/ExecutionEnvironmentITCase.java | 4 ++-- .../flink/test/operators/RemoteEnvironmentITCase.java | 4 ++-- .../AbstractTaskManagerProcessFailureRecoveryTest.java | 2 +- .../recovery/JobManagerHACheckpointRecoveryITCase.java | 3 ++- .../JobManagerHAProcessFailureBatchRecoveryITCase.java | 3 +-- .../test/recovery/TaskManagerFailureRecoveryITCase.java | 2 +- .../leaderelection/ZooKeeperLeaderElectionITCase.java | 3 ++- .../runtime/minicluster/LocalFlinkMiniClusterITCase.java | 3 ++- .../scala/runtime/jobmanager/JobManagerFailsITCase.scala | 2 +- .../runtime/taskmanager/TaskManagerFailsITCase.scala | 5 ++--- .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 3 +-- 35 files changed, 62 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index 01c281f..f837c4f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.dag.DataSinkNode; @@ -139,7 +140,7 @@ public class LocalExecutor extends PlanExecutor { .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED) .setNumSlotsPerTaskManager( configuration.getInteger( - ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)) + TaskManagerOptions.NUM_TASK_SLOTS, 1)) .build(); final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); @@ -220,7 +221,7 @@ public class LocalExecutor extends PlanExecutor { try { // TODO: Set job's default parallelism to max number of slots - final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); + final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); @@ -265,7 +266,7 @@ public class LocalExecutor extends PlanExecutor { private Configuration createConfiguration() { Configuration newConfiguration = new Configuration(); - newConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots()); + newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots()); newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles()); newConfiguration.addAll(baseConfiguration); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java index 8650cab..cf2ae4c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java @@ -18,7 +18,6 @@ package org.apache.flink.client.deployment; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -66,7 +65,7 @@ public final class ClusterSpecification { } public static ClusterSpecification fromConfiguration(Configuration configuration) { - int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java index 963002f..40225fb 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java @@ -78,7 +78,7 @@ public class ManualExactlyOnceTest { final Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java index 93b9caf..34dcdc0 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java @@ -90,7 +90,7 @@ public class ManualExactlyOnceWithStreamReshardingTest { final Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java index bff8c80..6b0b503 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java @@ -18,7 +18,6 @@ package org.apache.flink.storm.api; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -90,7 +89,7 @@ public class FlinkLocalCluster { configuration.addAll(jobGraph.getJobConfiguration()); configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); this.flink = new LocalFlinkMiniCluster(configuration, true); this.flink.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 102274d1..7a8403a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -247,7 +247,7 @@ public class BootstrapTools { cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString()); if (numSlots != -1){ - cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); + cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots); } return cfg; http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 44a567b..0a0c692 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.util.Preconditions; @@ -167,7 +168,7 @@ public class MiniClusterConfiguration { public MiniClusterConfiguration build() { final Configuration modifiedConfiguration = new Configuration(configuration); - modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); + modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTaskManager); modifiedConfiguration.setString( RestOptions.ADDRESS, modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost")); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index 1bf42ee..e8a7ae8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -162,7 +162,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { // -------------------------------------------------------------------------------------------- public static TaskManagerConfiguration fromConfiguration(Configuration configuration) { - int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); if (numberSlots == -1) { numberSlots = 1; http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index d029bc5..b80320c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -196,7 +196,7 @@ public class TaskManagerServicesConfiguration { boolean localCommunication) throws Exception { // we need this because many configs have been written with a "-1" entry - int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); if (slots == -1) { slots = 1; } @@ -290,7 +290,7 @@ public class TaskManagerServicesConfiguration { checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), "Leave config parameter empty or use 0 to let the system choose a port automatically."); - checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(), "Number of task slots must be at least one."); final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 1d44444..8a6a9d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.execution.Environment; @@ -59,7 +60,7 @@ public class CoordinatorShutdownTest extends TestLogger { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); cluster = new LocalFlinkMiniCluster(config, true); cluster.start(); @@ -128,7 +129,7 @@ public class CoordinatorShutdownTest extends TestLogger { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); cluster = new LocalFlinkMiniCluster(config, true); cluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java index 8806dec..33a4a28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; @@ -146,7 +147,7 @@ public class JobManagerCleanupITCase extends TestLogger { try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT()); config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index bb2dbf7..d991983 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; @@ -174,7 +174,7 @@ public class JobManagerHARecoveryTest extends TestLogger { flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); - flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots); flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L); try { http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 417294c..79e6d20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -641,7 +640,7 @@ public class JobManagerTest extends TestLogger { Configuration tmConfig = new Configuration(); tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); - tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8); ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor( tmConfig, @@ -1300,7 +1299,7 @@ public class JobManagerTest extends TestLogger { archiver = new AkkaActorGateway(master._2(), leaderId); Configuration tmConfig = new Configuration(); - tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4); ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( tmConfig, http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java index 942fcf3..2ebaeba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; @@ -72,7 +73,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM); configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay"); configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index bbcbbf0..6880d9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; @@ -78,7 +79,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM); highAvailabilityServices = new TestingManualHighAvailabilityServices(); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java index 994d02e..0e837a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; @@ -138,7 +138,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger { config, highAvailabilityServices); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism); taskManager = TestingUtils.createTaskManager( testActorSystem, http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java index 8fa302a..ccf5f60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; @@ -109,7 +109,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { config, highAvailabilityServices); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism); taskManager = TestingUtils.createTaskManager( testActorSystem, http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index 72596cd..ebdae31 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -51,7 +51,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setString(JobManagerOptions.ADDRESS, "127.0.0.1") config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1") - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setBoolean(SecurityOptions.SSL_ENABLED, true) @@ -78,7 +78,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setString(JobManagerOptions.ADDRESS, "127.0.0.1") config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1") - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setBoolean(SecurityOptions.SSL_ENABLED, true) @@ -101,7 +101,7 @@ class AkkaSslITCase(_system: ActorSystem) "start with akka ssl disabled" in { val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setBoolean(SecurityOptions.SSL_ENABLED, false) @@ -117,7 +117,7 @@ class AkkaSslITCase(_system: ActorSystem) an[Exception] should be thrownBy { val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") @@ -139,7 +139,7 @@ class AkkaSslITCase(_system: ActorSystem) an[Exception] should be thrownBy { val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index 71d2ee9..a2dffc8 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -22,7 +22,7 @@ import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.restartstrategy.RestartStrategies -import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, TaskManagerOptions} import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex} @@ -59,7 +59,7 @@ class RecoveryITCase(_system: ActorSystem) heartbeatTimeout: String) : TestingCluster = { val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers) config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) new TestingCluster(config) http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 2d8d02d..b89d2a6 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -88,7 +88,7 @@ object TestingUtils { def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = { val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs) config.setString(AkkaOptions.ASK_TIMEOUT, timeout) http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index cb6231a..2e07fb9 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -22,8 +22,8 @@ import java.io._ import akka.actor.ActorRef import akka.pattern.Patterns +import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions} import org.apache.flink.runtime.minicluster.StandaloneMiniCluster -import org.apache.flink.configuration.{ConfigConstants, Configuration, CoreOptions, GlobalConfiguration} import org.apache.flink.runtime.clusterframework.BootstrapTools import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.util.TestLogger @@ -321,7 +321,7 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism) + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism) configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE) cluster = Option(new StandaloneMiniCluster(configuration)) http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 8c21b37..844de01 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -198,7 +198,7 @@ public class MiniClusterResource extends ExternalResource { private void startLegacyMiniCluster() throws Exception { final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration()); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers()); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager()); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager()); final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster( configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index dd255fd..7e9b12e 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -127,7 +127,7 @@ public class TestBaseUtils extends TestLogger { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index 8f2aaa1..cfa155b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -25,6 +25,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -99,7 +100,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { final Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM); UUID id = UUID.randomUUID(); final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile(); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java index f7f993b..65a21d1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java @@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -50,7 +50,7 @@ public class ExecutionEnvironmentITCase extends TestLogger { @Test public void testLocalEnvironmentWithConfig() throws Exception { Configuration conf = new Configuration(); - conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); + conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM); final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf); env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java index a3a551c..c2d6341 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java @@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -96,7 +96,7 @@ public class RemoteEnvironmentITCase extends TestLogger { resource = miniCluster; } else { - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS); final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration); hostname = standaloneMiniCluster.getHostname(); port = standaloneMiniCluster.getPort(); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 29516dc..6764f6f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -406,7 +406,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test cfg.setInteger(JobManagerOptions.PORT, jobManagerPort); cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 16ea6d5..ce750d3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; @@ -365,7 +366,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots); config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString()); String tmpFolderString = temporaryFolder.newFolder().toString(); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index d217a2a..50404a4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -270,7 +269,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { // Task manager configuration config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( config, http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java index 8371230..755b3d3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java @@ -77,7 +77,7 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms"); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index f348b8a..399fc10 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.instance.ActorGateway; @@ -157,7 +158,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM); // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // sure that all TMs have registered to the JM prior to issuing the RecoverAllJobs message http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index 770b88c..b3d92df 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; @@ -76,7 +77,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger { try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots); miniCluster = new LocalFlinkMiniCluster(config, true); miniCluster.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index 5fe7b1d..5c9b1fb 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -133,7 +133,7 @@ class JobManagerFailsITCase(_system: ActorSystem) def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = { val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) config.setInteger(JobManagerOptions.PORT, 0) config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms") http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index a065e5b..69ad2d7 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -20,8 +20,7 @@ package org.apache.flink.api.scala.runtime.taskmanager import akka.actor.{ActorSystem, Kill, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.ConfigConstants -import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.{ConfigConstants, Configuration, TaskManagerOptions} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.io.network.partition.ResultPartitionType @@ -243,7 +242,7 @@ class TaskManagerFailsITCase(_system: ActorSystem) def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = { val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) new TestingCluster(config, singleActorSystem = false) http://git-wip-us.apache.org/repos/asf/flink/blob/2429a7b1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index fe04662..7596d68 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliArgsException; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; @@ -500,7 +499,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId } if (commandLine.hasOption(slots.getOpt())) { - effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt()))); + effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt()))); } if (isYarnPropertiesFileMode(commandLine)) {