Repository: flink Updated Branches: refs/heads/master 12bf7c1a0 -> 5eb0e38fb
http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index fa61acf..3e6702a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -46,10 +46,6 @@ public class BlobLibraryCacheManagerTest { @Test public void testLibraryCacheManagerCleanup() { - Configuration config = new Configuration(); - - config.setLong(ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, 1); - GlobalConfiguration.includeConfiguration(config); JobID jid = new JobID(); List<BlobKey> keys = new ArrayList<BlobKey>(); http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index f3b3507..2f5cc47 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -142,7 +142,7 @@ object FlinkShell { ): (String, Int, Option[Either[FlinkMiniCluster, ClusterClient]]) = { config.executionMode match { case ExecutionMode.LOCAL => // Local mode - val config = GlobalConfiguration.getConfiguration() + val config = GlobalConfiguration.loadConfiguration() config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) val miniCluster = new LocalFlinkMiniCluster(config, false) @@ -189,7 +189,7 @@ object FlinkShell { val conf = cluster match { case Some(Left(miniCluster)) => miniCluster.userConfiguration case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration - case None => GlobalConfiguration.getConfiguration + case None => GlobalConfiguration.loadConfiguration() } println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n") http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index 5721a61..ee1b264 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -380,13 +380,13 @@ object ScalaShellITCase { val repl = externalJars match { case Some(ej) => new FlinkILoop( host, port, - GlobalConfiguration.getConfiguration, + GlobalConfiguration.loadConfiguration(), Option(Array(ej)), in, new PrintWriter(out)) case None => new FlinkILoop( host, port, - GlobalConfiguration.getConfiguration, + GlobalConfiguration.loadConfiguration(), in, new PrintWriter(out)) } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 0332684..5b20447 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -42,7 +42,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { if (ctx.getParallelism() > 0) { setParallelism(ctx.getParallelism()); } else { - setParallelism(GlobalConfiguration.getInteger( + setParallelism(GlobalConfiguration.loadConfiguration().getInteger( ConfigConstants.DEFAULT_PARALLELISM_KEY, ConfigConstants.DEFAULT_PARALLELISM)); } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index f7cf160..b1521f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -40,7 +40,7 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { setParallelism(parallelism); } else { // determine parallelism - setParallelism(GlobalConfiguration.getInteger( + setParallelism(GlobalConfiguration.loadConfiguration().getInteger( ConfigConstants.DEFAULT_PARALLELISM_KEY, ConfigConstants.DEFAULT_PARALLELISM)); } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 8ba786f..60ae2ef 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -97,14 +97,6 @@ public class CliFrontendYarnAddressConfigurationTest { System.setErr(ERR); } - @Before - public void clearConfig() throws NoSuchFieldException, IllegalAccessException { - // reset GlobalConfiguration between tests - Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON"); - instance.setAccessible(true); - instance.set(null, null); - } - private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55"; private static final int TEST_YARN_JOB_MANAGER_PORT = 6655; private static final ApplicationId TEST_YARN_APPLICATION_ID = http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 75445e1..9b52975 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -110,7 +110,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { String fsStateHandlePath = tmp.getRoot().getPath(); - flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration()); flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 8a2ad60..3caa0ee 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -225,7 +225,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); flinkYarnClient.setConfigurationDirectory(confDirPath); - flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration()); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); // deploy http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 4df46a6..ba07af1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -81,8 +81,6 @@ import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> { private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class); - private static final String CONFIG_FILE_NAME = "flink-conf.yaml"; - /** * Minimum memory requirements, checked by the Client. */ @@ -142,10 +140,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // tries to load the config through the environment, if it fails it can still be set through the setters try { this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); - GlobalConfiguration.loadConfiguration(configurationDirectory); - this.flinkConfiguration = GlobalConfiguration.getConfiguration(); + this.flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory); - File confFile = new File(configurationDirectory + File.separator + CONFIG_FILE_NAME); + File confFile = new File(configurationDirectory + File.separator + GlobalConfiguration.FLINK_CONF_FILENAME); if (!confFile.exists()) { throw new RuntimeException("Unable to locate configuration file in " + confFile); } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index d19ddde..39b2510 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -418,8 +418,7 @@ public class YarnApplicationMasterRunner { private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) { LOG.info("Loading config from directory " + baseDirectory); - GlobalConfiguration.loadConfiguration(baseDirectory); - Configuration configuration = GlobalConfiguration.getConfiguration(); + Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory); configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);