http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java deleted file mode 100644 index 4f756fb..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java +++ /dev/null @@ -1,749 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.memory.HeapMemorySegment; -import org.apache.flink.core.memory.HybridMemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.ConnectionManager; -import org.apache.flink.runtime.io.network.LocalConnectionManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.taskexecutor.TaskExecutor; -import org.apache.flink.runtime.taskexecutor.TaskExecutorConfiguration; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.util.MathUtils; -import org.apache.flink.util.NetUtils; - -import akka.actor.ActorSystem; -import akka.util.Timeout; -import com.typesafe.config.Config; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * This class is the executable entry point for the task manager in yarn or standalone mode. - * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service) - * and starts them. - */ -public class TaskManagerRunner { - - private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class); - - /** - * Constructs related components of the TaskManager and starts them. - * - * @param configuration The configuration for the TaskManager. - * @param resourceID The id of the resource which the task manager will run on. - * @param rpcService Optionally, The rpc service which is used to start and connect to the TaskManager RpcEndpoint . - * If none is given, then a RpcService is constructed from the configuration. - * @param taskManagerHostname Optionally, The hostname/address that describes the TaskManager's data location. - * If none is given, it can be got from the configuration. - * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. - * @param haServices Optionally, a high availability service can be provided. If none is given, - * then a HighAvailabilityServices is constructed from the configuration. - */ - public static void createAndStartComponents( - final Configuration configuration, - final ResourceID resourceID, - RpcService rpcService, - String taskManagerHostname, - boolean localTaskManagerCommunication, - HighAvailabilityServices haServices) throws Exception { - - checkNotNull(configuration); - checkNotNull(resourceID); - - if (taskManagerHostname == null || taskManagerHostname.isEmpty()) { - taskManagerHostname = selectNetworkInterface(configuration); - } - - if (rpcService == null) { - // if no task manager port has been configured, use 0 (system will pick any free port) - final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); - if (actorSystemPort < 0 || actorSystemPort > 65535) { - throw new IllegalConfigurationException("Invalid value for '" + - ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + - "' (port for the TaskManager actor system) : " + actorSystemPort + - " - Leave config parameter empty or use 0 to let the system choose a port automatically."); - } - rpcService = createRpcService(configuration, taskManagerHostname, actorSystemPort); - } - - if(haServices == null) { - // start high availability service to implement getResourceManagerLeaderRetriever method only - haServices = new HighAvailabilityServices() { - @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { - return LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - } - - @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { - return null; - } - - @Override - public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { - return null; - } - }; - } - - createAndStartTaskManagerComponents( - configuration, - resourceID, - rpcService, - taskManagerHostname, - haServices, - localTaskManagerCommunication); - } - - /** - * <p/> - * This method tries to select the network interface to use for the TaskManager - * communication. The network interface is used both for the actor communication - * (coordination) as well as for the data exchange between task managers. Unless - * the hostname/interface is explicitly configured in the configuration, this - * method will try out various interfaces and methods to connect to the JobManager - * and select the one where the connection attempt is successful. - * <p/> - * - * @param configuration The configuration for the TaskManager. - * @return The host name under which the TaskManager communicates. - */ - private static String selectNetworkInterface(Configuration configuration) throws Exception { - String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); - if (taskManagerHostname != null) { - LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); - } else { - LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); - - InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); - taskManagerHostname = taskManagerAddress.getHostName(); - LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", - taskManagerHostname, taskManagerAddress.getHostAddress()); - } - - return taskManagerHostname; - } - - /** - * Utility method to create RPC service from configuration and hostname, port. - * - * @param configuration The configuration for the TaskManager. - * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. - * @param actorSystemPort If true, the TaskManager will not initiate the TCP network stack. - * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint . - * @throws java.io.IOException Thrown, if the actor system can not bind to the address - * @throws java.lang.Exception Thrown is some other error occurs while creating akka actor system - */ - private static RpcService createRpcService(Configuration configuration, String taskManagerHostname, int actorSystemPort) - throws Exception{ - - // Bring up the TaskManager actor system first, bind it to the given address. - - LOG.info("Starting TaskManager actor system at " + - NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); - - final ActorSystem taskManagerSystem; - try { - Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort); - Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address)); - LOG.debug("Using akka configuration\n " + akkaConfig); - taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); - } catch (Throwable t) { - if (t instanceof org.jboss.netty.channel.ChannelException) { - Throwable cause = t.getCause(); - if (cause != null && t.getCause() instanceof java.net.BindException) { - String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); - throw new IOException("Unable to bind TaskManager actor system to address " + - address + " - " + cause.getMessage(), t); - } - } - throw new Exception("Could not create TaskManager actor system", t); - } - - // start akka rpc service based on actor system - final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS); - final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); - - return akkaRpcService; - } - - /** - * @param configuration The configuration for the TaskManager. - * @param resourceID The id of the resource which the task manager will run on. - * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint . - * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. - * @param haServices Optionally, a high availability service can be provided. If none is given, - * then a HighAvailabilityServices is constructed from the configuration. - * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. - * @throws IllegalConfigurationException Thrown, if the given config contains illegal values. - * @throws IOException Thrown, if any of the I/O components (such as buffer pools, I/O manager, ...) - * cannot be properly started. - * @throws Exception Thrown is some other error occurs while parsing the configuration or - * starting the TaskManager components. - */ - private static void createAndStartTaskManagerComponents( - Configuration configuration, - ResourceID resourceID, - RpcService rpcService, - String taskManagerHostname, - HighAvailabilityServices haServices, - boolean localTaskManagerCommunication) throws Exception { - - final TaskExecutorConfiguration taskManagerConfig = parseTaskManagerConfiguration( - configuration, taskManagerHostname, localTaskManagerCommunication); - - TaskManagerComponents taskManagerComponents = createTaskManagerComponents( - resourceID, - InetAddress.getByName(taskManagerHostname), - taskManagerConfig, - configuration); - - final TaskExecutor taskExecutor = new TaskExecutor( - taskManagerConfig, - taskManagerComponents.getTaskManagerLocation(), - rpcService, taskManagerComponents.getMemoryManager(), - taskManagerComponents.getIOManager(), - taskManagerComponents.getNetworkEnvironment(), - haServices); - - taskExecutor.start(); - } - - /** - * Creates and returns the task manager components. - * - * @param resourceID resource ID of the task manager - * @param taskManagerAddress address of the task manager - * @param taskExecutorConfig task manager configuration - * @param configuration of Flink - * @return task manager components - * @throws Exception - */ - private static TaskManagerComponents createTaskManagerComponents( - ResourceID resourceID, - InetAddress taskManagerAddress, - TaskExecutorConfiguration taskExecutorConfig, - Configuration configuration) throws Exception { - - MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); - - // pre-start checks - checkTempDirs(taskExecutorConfig.getTmpDirPaths()); - - NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig(); - - NetworkBufferPool networkBufferPool = new NetworkBufferPool( - networkEnvironmentConfiguration.numNetworkBuffers(), - networkEnvironmentConfiguration.networkBufferSize(), - networkEnvironmentConfiguration.memoryType()); - - ConnectionManager connectionManager; - - if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { - connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get()); - } else { - connectionManager = new LocalConnectionManager(); - } - - ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); - TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); - - KvStateRegistry kvStateRegistry = new KvStateRegistry(); - - KvStateServer kvStateServer; - - if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { - NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get(); - - int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ? - nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads(); - - int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ? - nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads(); - - kvStateServer = new KvStateServer( - taskManagerAddress, - networkEnvironmentConfiguration.queryServerPort(), - numNetworkThreads, - numQueryThreads, - kvStateRegistry, - new DisabledKvStateRequestStats()); - } else { - kvStateServer = null; - } - - // we start the network first, to make sure it can allocate its buffers first - final NetworkEnvironment network = new NetworkEnvironment( - networkBufferPool, - connectionManager, - resultPartitionManager, - taskEventDispatcher, - kvStateRegistry, - kvStateServer, - networkEnvironmentConfiguration.ioMode(), - networkEnvironmentConfiguration.partitionRequestInitialBackoff(), - networkEnvironmentConfiguration.partitinRequestMaxBackoff()); - - network.start(); - - final TaskManagerLocation taskManagerLocation = new TaskManagerLocation( - resourceID, - taskManagerAddress, - network.getConnectionManager().getDataPort()); - - // computing the amount of memory to use depends on how much memory is available - // it strictly needs to happen AFTER the network stack has been initialized - - // check if a value has been configured - long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); - checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, - "MemoryManager needs at least one MB of memory. " + - "If you leave this config parameter empty, the system automatically " + - "pick a fraction of the available memory."); - - final long memorySize; - boolean preAllocateMemory = configuration.getBoolean( - ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); - if (configuredMemory > 0) { - if (preAllocateMemory) { - LOG.info("Using {} MB for managed memory." , configuredMemory); - } else { - LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); - } - memorySize = configuredMemory << 20; // megabytes to bytes - } else { - float fraction = configuration.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); - checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); - - if (memType == MemoryType.HEAP) { - long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); - if (preAllocateMemory) { - LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , - fraction , relativeMemSize >> 20); - } else { - LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + - "memory will be allocated lazily." , fraction , relativeMemSize >> 20); - } - memorySize = relativeMemSize; - } else if (memType == MemoryType.OFF_HEAP) { - // The maximum heap memory has been adjusted according to the fraction - long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); - long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction); - if (preAllocateMemory) { - LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , - fraction, directMemorySize >> 20); - } else { - LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + - " memory will be allocated lazily.", fraction, directMemorySize >> 20); - } - memorySize = directMemorySize; - } else { - throw new RuntimeException("No supported memory type detected."); - } - } - - // now start the memory manager - final MemoryManager memoryManager; - try { - memoryManager = new MemoryManager( - memorySize, - taskExecutorConfig.getNumberOfSlots(), - taskExecutorConfig.getNetworkConfig().networkBufferSize(), - memType, - preAllocateMemory); - } catch (OutOfMemoryError e) { - if (memType == MemoryType.HEAP) { - throw new Exception("OutOfMemory error (" + e.getMessage() + - ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); - } else if (memType == MemoryType.OFF_HEAP) { - throw new Exception("OutOfMemory error (" + e.getMessage() + - ") while allocating the TaskManager off-heap memory (" + memorySize + - " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); - } else { - throw e; - } - } - - // start the I/O manager, it will create some temp directories. - final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); - - return new TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network); - } - - // -------------------------------------------------------------------------- - // Parsing and checking the TaskManager Configuration - // -------------------------------------------------------------------------- - - /** - * Utility method to extract TaskManager config parameters from the configuration and to - * sanity check them. - * - * @param configuration The configuration. - * @param taskManagerHostname The host name under which the TaskManager communicates. - * @param localTaskManagerCommunication True, to skip initializing the network stack. - * Use only in cases where only one task manager runs. - * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. - */ - private static TaskExecutorConfiguration parseTaskManagerConfiguration( - Configuration configuration, - String taskManagerHostname, - boolean localTaskManagerCommunication) throws Exception { - - // ------- read values from the config and check them --------- - // (a lot of them) - - // ----> hosts / ports for communication and data exchange - - int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); - - checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - "Leave config parameter empty or use 0 to let the system choose a port automatically."); - - InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); - final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); - - // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories - - // we need this because many configs have been written with a "-1" entry - int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - if (slots == -1) { - slots = 1; - } - - checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, - "Number of task slots must be at least one."); - - final int numNetworkBuffers = configuration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); - - checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); - - final int pageSize = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); - - // check page size of for minimum size - checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); - - // check page size for power of two - checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Memory segment size must be a power of 2."); - - // check whether we use heap or off-heap memory - final MemoryType memType; - if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { - memType = MemoryType.OFF_HEAP; - } else { - memType = MemoryType.HEAP; - } - - // initialize the memory segment factory accordingly - if (memType == MemoryType.HEAP) { - if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to heap memory, but memory segment " + - "factory has been initialized for off-heap memory segments"); - } - } else { - if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to off-heap memory, but memory segment " + - "factory has been initialized for heap memory segments"); - } - } - - final String[] tmpDirs = configuration.getString( - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); - - final NettyConfig nettyConfig; - if (!localTaskManagerCommunication) { - nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), - taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration); - } else { - nettyConfig = null; - } - - // Default spill I/O mode for intermediate results - final String syncOrAsync = configuration.getString( - ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); - - final IOManager.IOMode ioMode; - if (syncOrAsync.equals("async")) { - ioMode = IOManager.IOMode.ASYNC; - } else { - ioMode = IOManager.IOMode.SYNC; - } - - final int queryServerPort = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_PORT, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT); - - final int queryServerNetworkThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS); - - final int queryServerQueryThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS); - - final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( - numNetworkBuffers, - pageSize, - memType, - ioMode, - queryServerPort, - queryServerNetworkThreads, - queryServerQueryThreads, - Option.apply(nettyConfig), - 500, - 3000); - - // ----> timeouts, library caching, profiling - - final FiniteDuration timeout; - try { - timeout = AkkaUtils.getTimeout(configuration); - } catch (Exception e) { - throw new IllegalArgumentException( - "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + - "'.Use formats like '50 s' or '1 min' to specify the timeout."); - } - LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout); - - final long cleanupInterval = configuration.getLong( - ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, - ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; - - final FiniteDuration finiteRegistrationDuration; - try { - Duration maxRegistrationDuration = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); - if (maxRegistrationDuration.isFinite()) { - finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS); - } else { - finiteRegistrationDuration = null; - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); - } - - final FiniteDuration initialRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - final FiniteDuration maxRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - final FiniteDuration refusedRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - return new TaskExecutorConfiguration( - tmpDirs, - cleanupInterval, - networkConfig, - timeout, - finiteRegistrationDuration, - slots, - configuration, - initialRegistrationPause, - maxRegistrationPause, - refusedRegistrationPause); - } - - /** - * Validates a condition for a config parameter and displays a standard exception, if the - * the condition does not hold. - * - * @param condition The condition that must hold. If the condition is false, an exception is thrown. - * @param parameter The parameter value. Will be shown in the exception message. - * @param name The name of the config parameter. Will be shown in the exception message. - * @param errorMessage The optional custom error message to append to the exception message. - */ - private static void checkConfigParameter( - boolean condition, - Object parameter, - String name, - String errorMessage) { - if (!condition) { - throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); - } - } - - /** - * Validates that all the directories denoted by the strings do actually exist, are proper - * directories (not files), and are writable. - * - * @param tmpDirs The array of directory paths to check. - * @throws Exception Thrown if any of the directories does not exist or is not writable - * or is a file, rather than a directory. - */ - private static void checkTempDirs(String[] tmpDirs) throws IOException { - for (String dir : tmpDirs) { - if (dir != null && !dir.equals("")) { - File file = new File(dir); - if (!file.exists()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); - } - if (!file.isDirectory()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); - } - if (!file.canWrite()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable."); - } - - if (LOG.isInfoEnabled()) { - long totalSpaceGb = file.getTotalSpace() >> 30; - long usableSpaceGb = file.getUsableSpace() >> 30; - double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; - String path = file.getAbsolutePath(); - LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", - path, totalSpaceGb, usableSpaceGb, usablePercentage)); - } - } else { - throw new IllegalArgumentException("Temporary file directory #$id is null."); - } - } - } - - private static class TaskManagerComponents { - private final TaskManagerLocation taskManagerLocation; - private final MemoryManager memoryManager; - private final IOManager ioManager; - private final NetworkEnvironment networkEnvironment; - - private TaskManagerComponents( - TaskManagerLocation taskManagerLocation, - MemoryManager memoryManager, - IOManager ioManager, - NetworkEnvironment networkEnvironment) { - - this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); - this.memoryManager = Preconditions.checkNotNull(memoryManager); - this.ioManager = Preconditions.checkNotNull(ioManager); - this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); - } - - public MemoryManager getMemoryManager() { - return memoryManager; - } - - public IOManager getIOManager() { - return ioManager; - } - - public NetworkEnvironment getNetworkEnvironment() { - return networkEnvironment; - } - - public TaskManagerLocation getTaskManagerLocation() { - return taskManagerLocation; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java index b6d9306..42655a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; @@ -170,6 +171,12 @@ public class LeaderRetrievalUtils { } public static InetAddress findConnectingAddress( + LeaderRetrievalService leaderRetrievalService, + Time timeout) throws LeaderRetrievalException { + return findConnectingAddress(leaderRetrievalService, new FiniteDuration(timeout.getSize(), timeout.getUnit())); + } + + public static InetAddress findConnectingAddress( LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout) throws LeaderRetrievalException { ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener(); http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index bd3af33..84f5ac7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -94,6 +94,10 @@ object AkkaUtils { createActorSystem(getDefaultAkkaConfig) } + def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = { + getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) else None) + } + /** * Creates an akka config with the provided configuration values. If the listening address is * specified, then the actor system will listen on the respective address. http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala index 893eaa8..97aae34 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala @@ -30,6 +30,6 @@ case class NetworkEnvironmentConfiguration( queryServerPort: Int, queryServerNetworkThreads: Int, queryServerQueryThreads: Int, - nettyConfig: Option[NettyConfig] = None, + nettyConfig: NettyConfig = null, partitionRequestInitialBackoff: Int = 500, partitinRequestMaxBackoff: Int = 3000) http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index af2b38f..79670a4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1932,7 +1932,7 @@ object TaskManager { netConfig.networkBufferSize, netConfig.memoryType) - val connectionManager = netConfig.nettyConfig match { + val connectionManager = Option(netConfig.nettyConfig) match { case Some(nettyConfig) => new NettyConnectionManager(nettyConfig) case None => new LocalConnectionManager() } @@ -1942,7 +1942,7 @@ object TaskManager { val kvStateRegistry = new KvStateRegistry() - val kvStateServer = netConfig.nettyConfig match { + val kvStateServer = Option(netConfig.nettyConfig) match { case Some(nettyConfig) => val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) { @@ -2274,7 +2274,7 @@ object TaskManager { queryServerPort, queryServerNetworkThreads, queryServerQueryThreads, - nettyConfig) + nettyConfig.getOrElse(null)) // ----> timeouts, library caching, profiling http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index a9ad75d..cc50b66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; -import scala.Some; import scala.concurrent.duration.FiniteDuration; import scala.concurrent.impl.Promise; @@ -75,7 +73,7 @@ public class NetworkEnvironmentTest { 0, 0, 0, - Some.<NettyConfig>empty(), + null, 0, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 5b8e6e6..2a004c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -30,7 +30,6 @@ import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import java.net.InetAddress; import java.util.BitSet; import java.util.UUID; import java.util.concurrent.Callable; http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 26218dd..9c1f288 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -26,8 +26,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; @@ -51,8 +52,8 @@ public class TaskExecutorTest extends TestLogger { try { // register a mock resource manager gateway ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); - TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class); - PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1); + TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); + PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); rpc.registerGateway(resourceManagerAddress, rmGateway); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); @@ -61,12 +62,15 @@ public class TaskExecutorTest extends TestLogger { NonHaServices haServices = new NonHaServices(resourceManagerAddress); TaskExecutor taskManager = new TaskExecutor( - taskExecutorConfiguration, + taskManagerServicesConfiguration, taskManagerLocation, - rpc, mock(MemoryManager.class), + rpc, + mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), - haServices); + haServices, + mock(MetricRegistry.class), + mock(FatalErrorHandler.class)); taskManager.start(); String taskManagerAddress = taskManager.getAddress(); @@ -101,19 +105,22 @@ public class TaskExecutorTest extends TestLogger { TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); haServices.setResourceManagerLeaderRetriever(testLeaderService); - TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class); - PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1); + TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); + PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); TaskExecutor taskManager = new TaskExecutor( - taskExecutorConfiguration, + taskManagerServicesConfiguration, taskManagerLocation, - rpc, mock(MemoryManager.class), + rpc, + mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), - haServices); + haServices, + mock(MetricRegistry.class), + mock(FatalErrorHandler.class)); taskManager.start(); String taskManagerAddress = taskManager.getAddress(); http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 1f93e9b..627a25a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -39,7 +39,6 @@ import org.apache.flink.runtime.io.network.LocalConnectionManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; @@ -105,7 +104,7 @@ public class TaskManagerComponentsStartupShutdownTest { final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0, - Option.<NettyConfig>empty(), 0, 0); + null, 0, 0); ResourceID taskManagerId = ResourceID.generate(); http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java index acfbbfd..c0d0455 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java @@ -29,7 +29,6 @@ import org.junit.Test; import scala.Tuple2; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.lang.reflect.Field;