http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java deleted file mode 100644 index 36d6310..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java +++ /dev/null @@ -1,827 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import akka.actor.ActorSystem; -import akka.dispatch.ExecutionContexts$; -import akka.util.Timeout; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.memory.HeapMemorySegment; -import org.apache.flink.core.memory.HybridMemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.taskmanager.MemoryLogger; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.util.MathUtils; -import org.apache.flink.util.NetUtils; - -import scala.Tuple2; -import scala.Option; -import scala.Some; -import scala.concurrent.ExecutionContext; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.UUID; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * TaskExecutor implementation. The task executor is responsible for the execution of multiple - * {@link org.apache.flink.runtime.taskmanager.Task}. - */ -public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { - - private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); - - /** The unique resource ID of this TaskExecutor */ - private final ResourceID resourceID; - - /** The access to the leader election and metadata storage services */ - private final HighAvailabilityServices haServices; - - /** The task manager configuration */ - private final TaskExecutorConfiguration taskExecutorConfig; - - /** The I/O manager component in the task manager */ - private final IOManager ioManager; - - /** The memory manager component in the task manager */ - private final MemoryManager memoryManager; - - /** The network component in the task manager */ - private final NetworkEnvironment networkEnvironment; - - /** The number of slots in the task manager, should be 1 for YARN */ - private final int numberOfSlots; - - // --------- resource manager -------- - - private TaskExecutorToResourceManagerConnection resourceManagerConnection; - - // ------------------------------------------------------------------------ - - public TaskExecutor( - TaskExecutorConfiguration taskExecutorConfig, - ResourceID resourceID, - MemoryManager memoryManager, - IOManager ioManager, - NetworkEnvironment networkEnvironment, - int numberOfSlots, - RpcService rpcService, - HighAvailabilityServices haServices) { - - super(rpcService); - - this.taskExecutorConfig = checkNotNull(taskExecutorConfig); - this.resourceID = checkNotNull(resourceID); - this.memoryManager = checkNotNull(memoryManager); - this.ioManager = checkNotNull(ioManager); - this.networkEnvironment = checkNotNull(networkEnvironment); - this.numberOfSlots = checkNotNull(numberOfSlots); - this.haServices = checkNotNull(haServices); - } - - // ------------------------------------------------------------------------ - // Life cycle - // ------------------------------------------------------------------------ - - @Override - public void start() { - super.start(); - - // start by connecting to the ResourceManager - try { - haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener()); - } catch (Exception e) { - onFatalErrorAsync(e); - } - } - - // ------------------------------------------------------------------------ - // RPC methods - ResourceManager related - // ------------------------------------------------------------------------ - - @RpcMethod - public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { - if (resourceManagerConnection != null) { - if (newLeaderAddress != null) { - // the resource manager switched to a new leader - log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); - } - else { - // address null means that the current leader is lost without a new leader being there, yet - log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getResourceManagerAddress()); - } - - // drop the current connection or connection attempt - if (resourceManagerConnection != null) { - resourceManagerConnection.close(); - resourceManagerConnection = null; - } - } - - // establish a connection to the new leader - if (newLeaderAddress != null) { - log.info("Attempting to register at ResourceManager {}", newLeaderAddress); - resourceManagerConnection = - new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId); - resourceManagerConnection.start(); - } - } - - /** - * Starts and runs the TaskManager. - * <p/> - * This method first tries to select the network interface to use for the TaskManager - * communication. The network interface is used both for the actor communication - * (coordination) as well as for the data exchange between task managers. Unless - * the hostname/interface is explicitly configured in the configuration, this - * method will try out various interfaces and methods to connect to the JobManager - * and select the one where the connection attempt is successful. - * <p/> - * After selecting the network interface, this method brings up an actor system - * for the TaskManager and its actors, starts the TaskManager's services - * (library cache, shuffle network stack, ...), and starts the TaskManager itself. - * - * @param configuration The configuration for the TaskManager. - * @param resourceID The id of the resource which the task manager will run on. - */ - public static void selectNetworkInterfaceAndRunTaskManager( - Configuration configuration, - ResourceID resourceID) throws Exception { - - final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); - - runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); - } - - private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) - throws Exception { - String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); - if (taskManagerHostname != null) { - LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); - } else { - LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); - - InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout); - taskManagerHostname = taskManagerAddress.getHostName(); - LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", - taskManagerHostname, taskManagerAddress.getHostAddress()); - } - - // if no task manager port has been configured, use 0 (system will pick any free port) - final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); - if (actorSystemPort < 0 || actorSystemPort > 65535) { - throw new IllegalConfigurationException("Invalid value for '" + - ConfigConstants.TASK_MANAGER_IPC_PORT_KEY + - "' (port for the TaskManager actor system) : " + actorSystemPort + - " - Leave config parameter empty or use 0 to let the system choose a port automatically."); - } - - return new InetSocketAddress(taskManagerHostname, actorSystemPort); - } - - /** - * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its - * actors, starts the TaskManager's services (library cache, shuffle network stack, ...), - * and starts the TaskManager itself. - * <p/> - * This method will also spawn a process reaper for the TaskManager (kill the process if - * the actor fails) and optionally start the JVM memory logging thread. - * - * @param taskManagerHostname The hostname/address of the interface where the actor system - * will communicate. - * @param resourceID The id of the resource which the task manager will run on. - * @param actorSystemPort The port at which the actor system will communicate. - * @param configuration The configuration for the TaskManager. - */ - private static void runTaskManager( - String taskManagerHostname, - ResourceID resourceID, - int actorSystemPort, - final Configuration configuration) throws Exception { - - LOG.info("Starting TaskManager"); - - // Bring up the TaskManager actor system first, bind it to the given address. - - LOG.info("Starting TaskManager actor system at " + - NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)); - - final ActorSystem taskManagerSystem; - try { - Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort); - Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address)); - LOG.debug("Using akka configuration\n " + akkaConfig); - taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); - } catch (Throwable t) { - if (t instanceof org.jboss.netty.channel.ChannelException) { - Throwable cause = t.getCause(); - if (cause != null && t.getCause() instanceof java.net.BindException) { - String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); - throw new IOException("Unable to bind TaskManager actor system to address " + - address + " - " + cause.getMessage(), t); - } - } - throw new Exception("Could not create TaskManager actor system", t); - } - - // start akka rpc service based on actor system - final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS); - final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); - - // start high availability service to implement getResourceManagerLeaderRetriever method only - final HighAvailabilityServices haServices = new HighAvailabilityServices() { - @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { - return LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - } - - @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { - return null; - } - - @Override - public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { - return null; - } - }; - - // start all the TaskManager services (network stack, library cache, ...) - // and the TaskManager actor - try { - LOG.info("Starting TaskManager actor"); - TaskExecutor taskExecutor = startTaskManagerComponentsAndActor( - configuration, - resourceID, - akkaRpcService, - taskManagerHostname, - haServices, - false); - - taskExecutor.start(); - - // if desired, start the logging daemon that periodically logs the memory usage information - if (LOG.isInfoEnabled() && configuration.getBoolean( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) { - LOG.info("Starting periodic memory usage logger"); - - long interval = configuration.getLong( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS); - - MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem); - logger.start(); - } - - // block until everything is done - taskManagerSystem.awaitTermination(); - } catch (Throwable t) { - LOG.error("Error while starting up taskManager", t); - try { - taskManagerSystem.shutdown(); - } catch (Throwable tt) { - LOG.warn("Could not cleanly shut down actor system", tt); - } - throw t; - } - } - - // -------------------------------------------------------------------------- - // Starting and running the TaskManager - // -------------------------------------------------------------------------- - - /** - * @param configuration The configuration for the TaskManager. - * @param resourceID The id of the resource which the task manager will run on. - * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint . - * @param taskManagerHostname The hostname/address that describes the TaskManager's data location. - * @param haServices Optionally, a high availability service can be provided. If none is given, - * then a HighAvailabilityServices is constructed from the configuration. - * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. - * @return An ActorRef to the TaskManager actor. - * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values. - * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools, - * I/O manager, ...) cannot be properly started. - * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration - * or starting the TaskManager components. - */ - public static TaskExecutor startTaskManagerComponentsAndActor( - Configuration configuration, - ResourceID resourceID, - RpcService rpcService, - String taskManagerHostname, - HighAvailabilityServices haServices, - boolean localTaskManagerCommunication) throws Exception { - - final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration( - configuration, taskManagerHostname, localTaskManagerCommunication); - - MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); - - // pre-start checks - checkTempDirs(taskExecutorConfig.getTmpDirPaths()); - - ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool()); - - // we start the network first, to make sure it can allocate its buffers first - final NetworkEnvironment network = new NetworkEnvironment( - executionContext, - taskExecutorConfig.getTimeout(), - taskExecutorConfig.getNetworkConfig(), - taskExecutorConfig.getConnectionInfo()); - - // computing the amount of memory to use depends on how much memory is available - // it strictly needs to happen AFTER the network stack has been initialized - - // check if a value has been configured - long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); - checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, - ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, - "MemoryManager needs at least one MB of memory. " + - "If you leave this config parameter empty, the system automatically " + - "pick a fraction of the available memory."); - - final long memorySize; - boolean preAllocateMemory = configuration.getBoolean( - ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); - if (configuredMemory > 0) { - if (preAllocateMemory) { - LOG.info("Using {} MB for managed memory." , configuredMemory); - } else { - LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); - } - memorySize = configuredMemory << 20; // megabytes to bytes - } else { - float fraction = configuration.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); - checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, - ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); - - if (memType == MemoryType.HEAP) { - long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction); - if (preAllocateMemory) { - LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , - fraction , relativeMemSize >> 20); - } else { - LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + - "memory will be allocated lazily." , fraction , relativeMemSize >> 20); - } - memorySize = relativeMemSize; - } else if (memType == MemoryType.OFF_HEAP) { - // The maximum heap memory has been adjusted according to the fraction - long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); - long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction); - if (preAllocateMemory) { - LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , - fraction, directMemorySize >> 20); - } else { - LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + - " memory will be allocated lazily.", fraction, directMemorySize >> 20); - } - memorySize = directMemorySize; - } else { - throw new RuntimeException("No supported memory type detected."); - } - } - - // now start the memory manager - final MemoryManager memoryManager; - try { - memoryManager = new MemoryManager( - memorySize, - taskExecutorConfig.getNumberOfSlots(), - taskExecutorConfig.getNetworkConfig().networkBufferSize(), - memType, - preAllocateMemory); - } catch (OutOfMemoryError e) { - if (memType == MemoryType.HEAP) { - throw new Exception("OutOfMemory error (" + e.getMessage() + - ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); - } else if (memType == MemoryType.OFF_HEAP) { - throw new Exception("OutOfMemory error (" + e.getMessage() + - ") while allocating the TaskManager off-heap memory (" + memorySize + - " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); - } else { - throw e; - } - } - - // start the I/O manager, it will create some temp directories. - final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); - - final TaskExecutor taskExecutor = new TaskExecutor( - taskExecutorConfig, - resourceID, - memoryManager, - ioManager, - network, - taskExecutorConfig.getNumberOfSlots(), - rpcService, - haServices); - - return taskExecutor; - } - - // -------------------------------------------------------------------------- - // Parsing and checking the TaskManager Configuration - // -------------------------------------------------------------------------- - - /** - * Utility method to extract TaskManager config parameters from the configuration and to - * sanity check them. - * - * @param configuration The configuration. - * @param taskManagerHostname The host name under which the TaskManager communicates. - * @param localTaskManagerCommunication True, to skip initializing the network stack. - * Use only in cases where only one task manager runs. - * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. - */ - private static TaskExecutorConfiguration parseTaskManagerConfiguration( - Configuration configuration, - String taskManagerHostname, - boolean localTaskManagerCommunication) throws Exception { - - // ------- read values from the config and check them --------- - // (a lot of them) - - // ----> hosts / ports for communication and data exchange - - int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); - if (dataport == 0) { - dataport = NetUtils.getAvailablePort(); - } - checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - "Leave config parameter empty or use 0 to let the system choose a port automatically."); - - InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); - final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport); - - // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories - - // we need this because many configs have been written with a "-1" entry - int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - if (slots == -1) { - slots = 1; - } - checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, - "Number of task slots must be at least one."); - - final int numNetworkBuffers = configuration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); - checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); - - final int pageSize = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); - // check page size of for minimum size - checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); - // check page size for power of two - checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, - "Memory segment size must be a power of 2."); - - // check whether we use heap or off-heap memory - final MemoryType memType; - if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { - memType = MemoryType.OFF_HEAP; - } else { - memType = MemoryType.HEAP; - } - - // initialize the memory segment factory accordingly - if (memType == MemoryType.HEAP) { - if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to heap memory, but memory segment " + - "factory has been initialized for off-heap memory segments"); - } - } else { - if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { - throw new Exception("Memory type is set to off-heap memory, but memory segment " + - "factory has been initialized for heap memory segments"); - } - } - - final String[] tmpDirs = configuration.getString( - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); - - final NettyConfig nettyConfig; - if (!localTaskManagerCommunication) { - nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration); - } else { - nettyConfig = null; - } - - // Default spill I/O mode for intermediate results - final String syncOrAsync = configuration.getString( - ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); - - final IOMode ioMode; - if (syncOrAsync.equals("async")) { - ioMode = IOManager.IOMode.ASYNC; - } else { - ioMode = IOManager.IOMode.SYNC; - } - - final int queryServerPort = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_PORT, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT); - - final int queryServerNetworkThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS); - - final int queryServerQueryThreads = configuration.getInteger( - ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, - ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS); - - final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( - numNetworkBuffers, - pageSize, - memType, - ioMode, - queryServerPort, - queryServerNetworkThreads, - queryServerQueryThreads, - localTaskManagerCommunication ? Option.<NettyConfig>empty() : new Some<>(nettyConfig), - new Tuple2<>(500, 3000)); - - // ----> timeouts, library caching, profiling - - final FiniteDuration timeout; - try { - timeout = AkkaUtils.getTimeout(configuration); - } catch (Exception e) { - throw new IllegalArgumentException( - "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + - "'.Use formats like '50 s' or '1 min' to specify the timeout."); - } - LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout); - - final long cleanupInterval = configuration.getLong( - ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, - ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; - - final FiniteDuration finiteRegistrationDuration; - try { - Duration maxRegistrationDuration = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); - if (maxRegistrationDuration.isFinite()) { - finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS); - } else { - finiteRegistrationDuration = null; - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); - } - - final FiniteDuration initialRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - final FiniteDuration maxRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - final FiniteDuration refusedRegistrationPause; - try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); - if (pause.isFinite()) { - refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS); - } else { - throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); - } - - return new TaskExecutorConfiguration( - tmpDirs, - cleanupInterval, - connectionInfo, - networkConfig, - timeout, - finiteRegistrationDuration, - slots, - configuration, - initialRegistrationPause, - maxRegistrationPause, - refusedRegistrationPause); - } - - /** - * Validates a condition for a config parameter and displays a standard exception, if the - * the condition does not hold. - * - * @param condition The condition that must hold. If the condition is false, an exception is thrown. - * @param parameter The parameter value. Will be shown in the exception message. - * @param name The name of the config parameter. Will be shown in the exception message. - * @param errorMessage The optional custom error message to append to the exception message. - */ - private static void checkConfigParameter( - boolean condition, - Object parameter, - String name, - String errorMessage) { - if (!condition) { - throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); - } - } - - /** - * Validates that all the directories denoted by the strings do actually exist, are proper - * directories (not files), and are writable. - * - * @param tmpDirs The array of directory paths to check. - * @throws Exception Thrown if any of the directories does not exist or is not writable - * or is a file, rather than a directory. - */ - private static void checkTempDirs(String[] tmpDirs) throws IOException { - for (String dir : tmpDirs) { - if (dir != null && !dir.equals("")) { - File file = new File(dir); - if (!file.exists()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); - } - if (!file.isDirectory()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); - } - if (!file.canWrite()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable."); - } - - if (LOG.isInfoEnabled()) { - long totalSpaceGb = file.getTotalSpace() >> 30; - long usableSpaceGb = file.getUsableSpace() >> 30; - double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; - String path = file.getAbsolutePath(); - LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", - path, totalSpaceGb, usableSpaceGb, usablePercentage)); - } - } else { - throw new IllegalArgumentException("Temporary file directory #$id is null."); - } - } - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public ResourceID getResourceID() { - return resourceID; - } - - // ------------------------------------------------------------------------ - // Error Handling - // ------------------------------------------------------------------------ - - /** - * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. - * This method should be used when asynchronous threads want to notify the - * TaskExecutor of a fatal error. - * - * @param t The exception describing the fatal error - */ - void onFatalErrorAsync(final Throwable t) { - runAsync(new Runnable() { - @Override - public void run() { - onFatalError(t); - } - }); - } - - /** - * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. - * This method must only be called from within the TaskExecutor's main thread. - * - * @param t The exception describing the fatal error - */ - void onFatalError(Throwable t) { - // to be determined, probably delegate to a fatal error handler that - // would either log (mini cluster) ot kill the process (yarn, mesos, ...) - log.error("FATAL ERROR", t); - } - - // ------------------------------------------------------------------------ - // Access to fields for testing - // ------------------------------------------------------------------------ - - @VisibleForTesting - TaskExecutorToResourceManagerConnection getResourceManagerConnection() { - return resourceManagerConnection; - } - - // ------------------------------------------------------------------------ - // Utility classes - // ------------------------------------------------------------------------ - - /** - * The listener for leader changes of the resource manager - */ - private class ResourceManagerLeaderListener implements LeaderRetrievalListener { - - @Override - public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { - getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); - } - - @Override - public void handleError(Exception exception) { - onFatalErrorAsync(exception); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java deleted file mode 100644 index 32484e1..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorConfiguration.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; - -import scala.concurrent.duration.FiniteDuration; - -import java.io.Serializable; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * {@link TaskExecutor} Configuration - */ -public class TaskExecutorConfiguration implements Serializable { - - private static final long serialVersionUID = 1L; - - private final String[] tmpDirPaths; - - private final long cleanupInterval; - - private final int numberOfSlots; - - private final Configuration configuration; - - private final FiniteDuration timeout; - private final FiniteDuration maxRegistrationDuration; - private final FiniteDuration initialRegistrationPause; - private final FiniteDuration maxRegistrationPause; - private final FiniteDuration refusedRegistrationPause; - - private final NetworkEnvironmentConfiguration networkConfig; - - private final InstanceConnectionInfo connectionInfo; - - public TaskExecutorConfiguration( - String[] tmpDirPaths, - long cleanupInterval, - InstanceConnectionInfo connectionInfo, - NetworkEnvironmentConfiguration networkConfig, - FiniteDuration timeout, - FiniteDuration maxRegistrationDuration, - int numberOfSlots, - Configuration configuration) { - - this (tmpDirPaths, - cleanupInterval, - connectionInfo, - networkConfig, - timeout, - maxRegistrationDuration, - numberOfSlots, - configuration, - new FiniteDuration(500, TimeUnit.MILLISECONDS), - new FiniteDuration(30, TimeUnit.SECONDS), - new FiniteDuration(10, TimeUnit.SECONDS)); - } - - public TaskExecutorConfiguration( - String[] tmpDirPaths, - long cleanupInterval, - InstanceConnectionInfo connectionInfo, - NetworkEnvironmentConfiguration networkConfig, - FiniteDuration timeout, - FiniteDuration maxRegistrationDuration, - int numberOfSlots, - Configuration configuration, - FiniteDuration initialRegistrationPause, - FiniteDuration maxRegistrationPause, - FiniteDuration refusedRegistrationPause) { - - this.tmpDirPaths = checkNotNull(tmpDirPaths); - this.cleanupInterval = checkNotNull(cleanupInterval); - this.connectionInfo = checkNotNull(connectionInfo); - this.networkConfig = checkNotNull(networkConfig); - this.timeout = checkNotNull(timeout); - this.maxRegistrationDuration = maxRegistrationDuration; - this.numberOfSlots = checkNotNull(numberOfSlots); - this.configuration = checkNotNull(configuration); - this.initialRegistrationPause = checkNotNull(initialRegistrationPause); - this.maxRegistrationPause = checkNotNull(maxRegistrationPause); - this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause); - } - - // -------------------------------------------------------------------------------------------- - // Properties - // -------------------------------------------------------------------------------------------- - - public String[] getTmpDirPaths() { - return tmpDirPaths; - } - - public long getCleanupInterval() { - return cleanupInterval; - } - - public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; } - - public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } - - public FiniteDuration getTimeout() { - return timeout; - } - - public FiniteDuration getMaxRegistrationDuration() { - return maxRegistrationDuration; - } - - public int getNumberOfSlots() { - return numberOfSlots; - } - - public Configuration getConfiguration() { - return configuration; - } - - public FiniteDuration getInitialRegistrationPause() { - return initialRegistrationPause; - } - - public FiniteDuration getMaxRegistrationPause() { - return maxRegistrationPause; - } - - public FiniteDuration getRefusedRegistrationPause() { - return refusedRegistrationPause; - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java deleted file mode 100644 index b0b21b0..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import org.apache.flink.runtime.rpc.RpcGateway; - -import java.util.UUID; - -/** - * {@link TaskExecutor} RPC gateway interface - */ -public interface TaskExecutorGateway extends RpcGateway { - - // ------------------------------------------------------------------------ - // ResourceManager handlers - // ------------------------------------------------------------------------ - - void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java deleted file mode 100644 index 641102d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.rpc.registration.RegistrationResponse; - -import java.io.Serializable; - -/** - * Base class for responses from the ResourceManager to a registration attempt by a - * TaskExecutor. - */ -public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.Success implements Serializable { - - private static final long serialVersionUID = 1L; - - private final InstanceID registrationId; - - private final long heartbeatInterval; - - /** - * Create a new {@code TaskExecutorRegistrationSuccess} message. - * - * @param registrationId The ID that the ResourceManager assigned the registration. - * @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor. - */ - public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) { - this.registrationId = registrationId; - this.heartbeatInterval = heartbeatInterval; - } - - /** - * Gets the ID that the ResourceManager assigned the registration. - */ - public InstanceID getRegistrationId() { - return registrationId; - } - - /** - * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor. - */ - public long getHeartbeatInterval() { - return heartbeatInterval; - } - - @Override - public String toString() { - return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')'; - } - -} - - - - - - - http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java deleted file mode 100644 index 7ccc879..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import akka.dispatch.OnFailure; -import akka.dispatch.OnSuccess; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.registration.RetryingRegistration; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; - -import org.slf4j.Logger; - -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -/** - * The connection between a TaskExecutor and the ResourceManager. - */ -public class TaskExecutorToResourceManagerConnection { - - /** the logger for all log messages of this class */ - private final Logger log; - - /** the TaskExecutor whose connection to the ResourceManager this represents */ - private final TaskExecutor taskExecutor; - - private final UUID resourceManagerLeaderId; - - private final String resourceManagerAddress; - - private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration; - - private ResourceManagerGateway registeredResourceManager; - - private InstanceID registrationId; - - /** flag indicating that the connection is closed */ - private volatile boolean closed; - - - public TaskExecutorToResourceManagerConnection( - Logger log, - TaskExecutor taskExecutor, - String resourceManagerAddress, - UUID resourceManagerLeaderId) { - - this.log = checkNotNull(log); - this.taskExecutor = checkNotNull(taskExecutor); - this.resourceManagerAddress = checkNotNull(resourceManagerAddress); - this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); - } - - // ------------------------------------------------------------------------ - // Life cycle - // ------------------------------------------------------------------------ - - @SuppressWarnings("unchecked") - public void start() { - checkState(!closed, "The connection is already closed"); - checkState(!isRegistered() && pendingRegistration == null, "The connection is already started"); - - pendingRegistration = new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( - log, taskExecutor.getRpcService(), - resourceManagerAddress, resourceManagerLeaderId, - taskExecutor.getAddress(), taskExecutor.getResourceID()); - pendingRegistration.startRegistration(); - - Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture(); - - future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() { - @Override - public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) { - registeredResourceManager = result.f0; - registrationId = result.f1.getRegistrationId(); - } - }, taskExecutor.getMainThreadExecutionContext()); - - // this future should only ever fail if there is a bug, not if the registration is declined - future.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) { - taskExecutor.onFatalError(failure); - } - }, taskExecutor.getMainThreadExecutionContext()); - } - - public void close() { - closed = true; - - // make sure we do not keep re-trying forever - if (pendingRegistration != null) { - pendingRegistration.cancel(); - } - } - - public boolean isClosed() { - return closed; - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public UUID getResourceManagerLeaderId() { - return resourceManagerLeaderId; - } - - public String getResourceManagerAddress() { - return resourceManagerAddress; - } - - /** - * Gets the ResourceManagerGateway. This returns null until the registration is completed. - */ - public ResourceManagerGateway getResourceManager() { - return registeredResourceManager; - } - - /** - * Gets the ID under which the TaskExecutor is registered at the ResourceManager. - * This returns null until the registration is completed. - */ - public InstanceID getRegistrationId() { - return registrationId; - } - - public boolean isRegistered() { - return registeredResourceManager != null; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return String.format("Connection to ResourceManager %s (leaderId=%s)", - resourceManagerAddress, resourceManagerLeaderId); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static class ResourceManagerRegistration - extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> { - - private final String taskExecutorAddress; - - private final ResourceID resourceID; - - ResourceManagerRegistration( - Logger log, - RpcService rpcService, - String targetAddress, - UUID leaderId, - String taskExecutorAddress, - ResourceID resourceID) { - - super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId); - this.taskExecutorAddress = checkNotNull(taskExecutorAddress); - this.resourceID = checkNotNull(resourceID); - } - - @Override - protected Future<RegistrationResponse> invokeRegistration( - ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { - - FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS); - return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java new file mode 100644 index 0000000..a5de2d5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +import java.io.Serializable; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A report about the current status of all slots of the TaskExecutor, describing + * which slots are available and allocated, and what jobs (JobManagers) the allocated slots + * have been allocated to. + */ +public class SlotReport implements Serializable { + + private static final long serialVersionUID = -3150175198722481689L; + + /** The slots status of the TaskManager */ + private final List<SlotStatus> slotsStatus; + + /** The resource id which identifies the TaskManager */ + private final ResourceID resourceID; + + public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) { + this.slotsStatus = checkNotNull(slotsStatus); + this.resourceID = checkNotNull(resourceID); + } + + public List<SlotStatus> getSlotsStatus() { + return slotsStatus; + } + + public ResourceID getResourceID() { + return resourceID; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java new file mode 100644 index 0000000..744b674 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This describes the slot current status which located in TaskManager. + */ +public class SlotStatus implements Serializable { + + private static final long serialVersionUID = 5099191707339664493L; + + /** slotID to identify a slot */ + private final SlotID slotID; + + /** the resource profile of the slot */ + private final ResourceProfile profiler; + + /** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */ + private final AllocationID allocationID; + + /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ + private final JobID jobID; + + public SlotStatus(SlotID slotID, ResourceProfile profiler) { + this(slotID, profiler, null, null); + } + + public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) { + this.slotID = checkNotNull(slotID, "slotID cannot be null"); + this.profiler = checkNotNull(profiler, "profile cannot be null"); + this.allocationID = allocationID; + this.jobID = jobID; + } + + /** + * Get the unique identification of this slot + * + * @return The slot id + */ + public SlotID getSlotID() { + return slotID; + } + + /** + * Get the resource profile of this slot + * + * @return The resource profile + */ + public ResourceProfile getProfiler() { + return profiler; + } + + /** + * Get the allocation id of this slot + * + * @return The allocation id if this slot is allocated, otherwise null + */ + public AllocationID getAllocationID() { + return allocationID; + } + + /** + * Get the job id of the slot allocated for + * + * @return The job id if this slot is allocated, otherwise null + */ + public JobID getJobID() { + return jobID; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlotStatus that = (SlotStatus) o; + + if (!slotID.equals(that.slotID)) { + return false; + } + if (!profiler.equals(that.profiler)) { + return false; + } + if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) { + return false; + } + return jobID != null ? jobID.equals(that.jobID) : that.jobID == null; + + } + + @Override + public int hashCode() { + int result = slotID.hashCode(); + result = 31 * result + profiler.hashCode(); + result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0); + result = 31 * result + (jobID != null ? jobID.hashCode() : 0); + return result; + } + +}