[FLINK-6518] Port blobserver config parameters to ConfigOptions This closes #3865.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d719118 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d719118 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d719118 Branch: refs/heads/master Commit: 4d719118cee3a256722e2bacf276568d601f2b65 Parents: 707f25f Author: zentol <ches...@apache.org> Authored: Wed May 10 10:26:19 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Sat May 13 17:53:20 2017 +0200 ---------------------------------------------------------------------- .../flink/configuration/BlobServerOptions.java | 76 ++++++++++++++++++++ .../flink/configuration/ConfigConstants.java | 40 ++++++----- .../apache/flink/runtime/blob/BlobCache.java | 9 ++- .../apache/flink/runtime/blob/BlobClient.java | 5 +- .../apache/flink/runtime/blob/BlobServer.java | 22 +++--- .../flink/runtime/blob/BlobClientSslTest.java | 5 +- .../flink/runtime/blob/BlobServerRangeTest.java | 8 +-- .../jobmanager/JobManagerStartupTest.java | 4 +- 8 files changed, 125 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java new file mode 100644 index 0000000..e27c29f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for the BlobServer. + */ +@PublicEvolving +public class BlobServerOptions { + + /** + * The config parameter defining the storage directory to be used by the blob server. + */ + public static final ConfigOption<String> STORAGE_DIRECTORY = + key("blob.storage.directory") + .noDefaultValue(); + + /** + * The config parameter defining number of retires for failed BLOB fetches. + */ + public static final ConfigOption<Integer> FETCH_RETRIES = + key("blob.fetch.retries") + .defaultValue(5); + + /** + * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves. + */ + public static final ConfigOption<Integer> FETCH_CONCURRENT = + key("blob.fetch.num-concurrent") + .defaultValue(50); + + /** + * The config parameter defining the backlog of BLOB fetches on the JobManager. + */ + public static final ConfigOption<Integer> FETCH_BACKLOG = + key("blob.fetch.backlog") + .defaultValue(1000); + + /** + * The config parameter defining the server port of the blob service. + * The port can either be a port, such as "9123", + * a range of ports: "50100-50200" + * or a list of ranges and or points: "50100-50200,50300-50400,51234" + * + * Setting the port to 0 will let the OS choose an available port. + */ + public static final ConfigOption<String> PORT = + key("blob.server.port") + .defaultValue("0"); + + /** + * Flag to override ssl support for the blob service transport. + */ + public static final ConfigOption<Boolean> SSL_ENABLED = + key("blob.service.ssl.enabled") + .defaultValue(true); +} http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index c3704be..b5b5486 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -139,36 +139,39 @@ public final class ConfigConstants { public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port"; /** - * The config parameter defining the storage directory to be used by the blob server. + * @deprecated use {@link BlobServerOptions#STORAGE_DIRECTORY} instead */ + @Deprecated public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory"; /** - * The config parameter defining number of retires for failed BLOB fetches. + * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead */ + @Deprecated public static final String BLOB_FETCH_RETRIES_KEY = "blob.fetch.retries"; /** - * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves. + * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead */ + @Deprecated public static final String BLOB_FETCH_CONCURRENT_KEY = "blob.fetch.num-concurrent"; /** - * The config parameter defining the backlog of BLOB fetches on the JobManager + * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead */ + @Deprecated public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog"; /** - * The config parameter defining the server port of the blob service. - * The port can either be a port, such as "9123", - * a range of ports: "50100-50200" - * or a list of ranges and or points: "50100-50200,50300-50400,51234" - * - * Setting the port to 0 will let the OS choose an available port. + * @deprecated use {@link BlobServerOptions#PORT} instead */ + @Deprecated public static final String BLOB_SERVER_PORT = "blob.server.port"; - /** Flag to override ssl support for the blob service transport */ + /** + * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead + */ + @Deprecated public static final String BLOB_SERVICE_SSL_ENABLED = "blob.service.ssl.enabled"; /** @@ -1094,28 +1097,33 @@ public final class ConfigConstants { public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0; /** - * The default value to override ssl support for blob service transport + * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead */ + @Deprecated public static final boolean DEFAULT_BLOB_SERVICE_SSL_ENABLED = true; /** - * Default number of retries for failed BLOB fetches. + * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead */ + @Deprecated public static final int DEFAULT_BLOB_FETCH_RETRIES = 5; /** - * Default number of concurrent BLOB fetch operations. + * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead */ + @Deprecated public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50; /** - * Default BLOB fetch connection backlog. + * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead */ + @Deprecated public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000; /** - * Default BLOB server port. 0 means ephemeral port. + * @deprecated use {@link BlobServerOptions#PORT} instead */ + @Deprecated public static final String DEFAULT_BLOB_SERVER_PORT = "0"; /** http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 2587b15..23c7e63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.util.FileUtils; @@ -129,19 +129,18 @@ public final class BlobCache implements BlobService { this.blobStore = blobStore; // configure and create the storage directory - String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); + String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB cache storage directory " + storageDir); // configure the number of fetch retries - final int fetchRetries = blobClientConfig.getInteger( - ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES); + final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES); if (fetchRetries >= 0) { this.numFetchRetries = fetchRetries; } else { LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", - ConfigConstants.BLOB_FETCH_RETRIES_KEY); + BlobServerOptions.FETCH_RETRIES.key()); this.numFetchRetries = 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index ea90f54..49e54a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; @@ -92,8 +92,7 @@ public final class BlobClient implements Closeable { // Check if ssl is enabled SSLContext clientSSLContext = null; if (clientConfig != null && - clientConfig.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, - ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) { + clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) { clientSSLContext = SSLUtils.createSSLClientContext(clientConfig); } http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 8a70559..0e15777 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -111,34 +111,32 @@ public class BlobServer extends Thread implements BlobService { this.blobStore = checkNotNull(blobStore); // configure and create the storage directory - String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); + String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); // configure the maximum number of concurrent connections - final int maxConnections = config.getInteger( - ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); + final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT); if (maxConnections >= 1) { this.maxConnections = maxConnections; } else { LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", - maxConnections, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); - this.maxConnections = ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT; + maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue()); + this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue(); } // configure the backlog of connections - int backlog = config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG); + int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG); if (backlog < 1) { LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", - backlog, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG); - backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; + backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue()); + backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue(); } this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); - if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, - ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) { + if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) { try { serverSSLContext = SSLUtils.createSSLServerContext(config); } catch (Exception e) { @@ -148,7 +146,7 @@ public class BlobServer extends Thread implements BlobService { // ----------------------- start the server ------------------- - String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT); + String serverPortRange = config.getString(BlobServerOptions.PORT); Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange); http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 5054107..27603d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -31,6 +31,7 @@ import java.security.MessageDigest; import java.util.Collections; import java.util.List; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; @@ -91,7 +92,7 @@ public class BlobClientSslTest { try { Configuration config = new Configuration(); config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); + config.setBoolean(BlobServerOptions.SSL_ENABLED, false); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); @@ -104,7 +105,7 @@ public class BlobClientSslTest { clientConfig = new Configuration(); clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); + clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); } http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index ea0eb94..c3762aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; @@ -38,7 +38,7 @@ public class BlobServerRangeTest extends TestLogger { @Test public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0"); + conf.setString(BlobServerOptions.PORT, "0"); BlobServer srv = new BlobServer(conf); srv.shutdown(); } @@ -59,7 +59,7 @@ public class BlobServerRangeTest extends TestLogger { } Configuration conf = new Configuration(); - conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort())); + conf.setString(BlobServerOptions.PORT, String.valueOf(socket.getLocalPort())); // this thing is going to throw an exception try { @@ -88,7 +88,7 @@ public class BlobServerRangeTest extends TestLogger { } int availablePort = NetUtils.getAvailablePort(); Configuration conf = new Configuration(); - conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort); + conf.setString(BlobServerOptions.PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort); // this thing is going to throw an exception try { http://git-wip-us.apache.org/repos/asf/flink/blob/4d719118/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java index 9ac6873..a906d9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java @@ -30,7 +30,7 @@ import java.util.List; import com.google.common.io.Files; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.StartupUtils; import org.apache.flink.util.NetUtils; @@ -130,7 +130,7 @@ public class JobManagerStartupTest extends TestLogger { } Configuration failConfig = new Configuration(); String nonExistDirectory = new File(blobStorageDirectory, DOES_NOT_EXISTS_NO_SIR).getAbsolutePath(); - failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory); + failConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, nonExistDirectory); try { JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);