Repository: flink Updated Branches: refs/heads/flip-6 1f3256b59 -> 17b83f11b
[hotfix] [taskmanager] Fixes TaskManager component creation at startup Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17b83f11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17b83f11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17b83f11 Branch: refs/heads/flip-6 Commit: 17b83f11be467d23458ba1d6299df8b1b1510664 Parents: 1f3256b Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Sep 8 18:43:15 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Sep 8 18:44:25 2016 +0200 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 189 ++++++++++++++++--- .../taskexecutor/TaskExecutorConfiguration.java | 9 - 2 files changed, 159 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/17b83f11/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 735730b..a455fe2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -19,9 +19,19 @@ package org.apache.flink.runtime.taskexecutor; import akka.actor.ActorSystem; -import akka.dispatch.ExecutionContexts$; import akka.util.Timeout; import com.typesafe.config.Config; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; +import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +47,6 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -61,7 +70,6 @@ import org.apache.flink.util.NetUtils; import scala.Tuple2; import scala.Option; import scala.Some; -import scala.concurrent.ExecutionContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -70,9 +78,9 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.UUID; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -86,6 +94,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { /** The unique resource ID of this TaskExecutor */ private final ResourceID resourceID; + private final TaskManagerLocation taskManagerLocation; + /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; @@ -113,22 +123,26 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { public TaskExecutor( TaskExecutorConfiguration taskExecutorConfig, ResourceID resourceID, + TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, - int numberOfSlots, RpcService rpcService, HighAvailabilityServices haServices) { super(rpcService); + checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0."); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.taskManagerLocation = checkNotNull(taskManagerLocation); this.memoryManager = checkNotNull(memoryManager); this.ioManager = checkNotNull(ioManager); this.networkEnvironment = checkNotNull(networkEnvironment); - this.numberOfSlots = checkNotNull(numberOfSlots); this.haServices = checkNotNull(haServices); + + this.numberOfSlots = taskExecutorConfig.getNumberOfSlots(); } // ------------------------------------------------------------------------ @@ -360,10 +374,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { * then a HighAvailabilityServices is constructed from the configuration. * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack. * @return An ActorRef to the TaskManager actor. - * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values. - * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools, + * @throws IllegalConfigurationException Thrown, if the given config contains illegal values. + * @throws IOException Thrown, if any of the I/O components (such as buffer pools, * I/O manager, ...) cannot be properly started. - * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration + * @throws Exception Thrown is some other error occurs while parsing the configuration * or starting the TaskManager components. */ public static TaskExecutor startTaskManagerComponentsAndActor( @@ -377,19 +391,105 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration( configuration, taskManagerHostname, localTaskManagerCommunication); + TaskManagerComponents taskManagerComponents = createTaskManagerComponents( + resourceID, + InetAddress.getByName(taskManagerHostname), + taskExecutorConfig, + configuration); + + final TaskExecutor taskExecutor = new TaskExecutor( + taskExecutorConfig, + resourceID, + taskManagerComponents.getTaskManagerLocation(), + taskManagerComponents.getMemoryManager(), + taskManagerComponents.getIOManager(), + taskManagerComponents.getNetworkEnvironment(), + rpcService, + haServices); + + return taskExecutor; + } + + /** + * Creates and returns the task manager components. + * + * @param resourceID resource ID of the task manager + * @param taskManagerAddress address of the task manager + * @param taskExecutorConfig task manager configuration + * @param configuration of Flink + * @return task manager components + * @throws Exception + */ + private static TaskExecutor.TaskManagerComponents createTaskManagerComponents( + ResourceID resourceID, + InetAddress taskManagerAddress, + TaskExecutorConfiguration taskExecutorConfig, + Configuration configuration) throws Exception { MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType(); // pre-start checks checkTempDirs(taskExecutorConfig.getTmpDirPaths()); - ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool()); + NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig(); + + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + networkEnvironmentConfiguration.numNetworkBuffers(), + networkEnvironmentConfiguration.networkBufferSize(), + networkEnvironmentConfiguration.memoryType()); + + ConnectionManager connectionManager; + + if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { + connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get()); + } else { + connectionManager = new LocalConnectionManager(); + } + + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); + + KvStateRegistry kvStateRegistry = new KvStateRegistry(); + + KvStateServer kvStateServer; + + if (networkEnvironmentConfiguration.nettyConfig().isDefined()) { + NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get(); + + int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads(); + + int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads(); + + kvStateServer = new KvStateServer( + taskManagerAddress, + networkEnvironmentConfiguration.queryServerPort(), + numNetworkThreads, + numQueryThreads, + kvStateRegistry, + new DisabledKvStateRequestStats()); + } else { + kvStateServer = null; + } // we start the network first, to make sure it can allocate its buffers first final NetworkEnvironment network = new NetworkEnvironment( - executionContext, - taskExecutorConfig.getTimeout(), - taskExecutorConfig.getNetworkConfig(), - taskExecutorConfig.getConnectionInfo()); + networkBufferPool, + connectionManager, + resultPartitionManager, + taskEventDispatcher, + kvStateRegistry, + kvStateServer, + networkEnvironmentConfiguration.ioMode(), + networkEnvironmentConfiguration.partitionRequestInitialBackoff(), + networkEnvironmentConfiguration.partitinRequestMaxBackoff()); + + network.start(); + + TaskManagerLocation taskManagerLocation = new TaskManagerLocation( + resourceID, + taskManagerAddress, + network.getConnectionManager().getDataPort()); // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized @@ -473,17 +573,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths()); - final TaskExecutor taskExecutor = new TaskExecutor( - taskExecutorConfig, - resourceID, - memoryManager, - ioManager, - network, - taskExecutorConfig.getNumberOfSlots(), - rpcService, - haServices); - - return taskExecutor; + return new TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network); } // -------------------------------------------------------------------------- @@ -519,7 +609,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { "Leave config parameter empty or use 0 to let the system choose a port automatically."); InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname); - final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport); + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories @@ -576,7 +666,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { final NettyConfig nettyConfig; if (!localTaskManagerCommunication) { - nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration); + nettyConfig = new NettyConfig( + taskManagerInetSocketAddress.getAddress(), + taskManagerInetSocketAddress.getPort(), + pageSize, + slots, + configuration); } else { nettyConfig = null; } @@ -613,8 +708,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { queryServerPort, queryServerNetworkThreads, queryServerQueryThreads, - localTaskManagerCommunication ? Option.<NettyConfig>empty() : new Some<>(nettyConfig), - new Tuple2<>(500, 3000)); + Option.apply(nettyConfig), + 500, + 30000); // ----> timeouts, library caching, profiling @@ -695,7 +791,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { return new TaskExecutorConfiguration( tmpDirs, cleanupInterval, - connectionInfo, networkConfig, timeout, finiteRegistrationDuration, @@ -829,4 +924,38 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { onFatalErrorAsync(exception); } } + + private static class TaskManagerComponents { + private final TaskManagerLocation taskManagerLocation; + private final MemoryManager memoryManager; + private final IOManager ioManager; + private final NetworkEnvironment networkEnvironment; + + private TaskManagerComponents( + TaskManagerLocation taskManagerLocation, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment) { + this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); + this.memoryManager = Preconditions.checkNotNull(memoryManager); + this.ioManager = Preconditions.checkNotNull(ioManager); + this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); + } + + public MemoryManager getMemoryManager() { + return memoryManager; + } + + public IOManager getIOManager() { + return ioManager; + } + + public NetworkEnvironment getNetworkEnvironment() { + return networkEnvironment; + } + + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/17b83f11/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java index 3707a47..c97c893 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import scala.concurrent.duration.FiniteDuration; @@ -52,12 +51,9 @@ public class TaskExecutorConfiguration implements Serializable { private final NetworkEnvironmentConfiguration networkConfig; - private final InstanceConnectionInfo connectionInfo; - public TaskExecutorConfiguration( String[] tmpDirPaths, long cleanupInterval, - InstanceConnectionInfo connectionInfo, NetworkEnvironmentConfiguration networkConfig, FiniteDuration timeout, FiniteDuration maxRegistrationDuration, @@ -66,7 +62,6 @@ public class TaskExecutorConfiguration implements Serializable { this (tmpDirPaths, cleanupInterval, - connectionInfo, networkConfig, timeout, maxRegistrationDuration, @@ -80,7 +75,6 @@ public class TaskExecutorConfiguration implements Serializable { public TaskExecutorConfiguration( String[] tmpDirPaths, long cleanupInterval, - InstanceConnectionInfo connectionInfo, NetworkEnvironmentConfiguration networkConfig, FiniteDuration timeout, FiniteDuration maxRegistrationDuration, @@ -92,7 +86,6 @@ public class TaskExecutorConfiguration implements Serializable { this.tmpDirPaths = checkNotNull(tmpDirPaths); this.cleanupInterval = checkNotNull(cleanupInterval); - this.connectionInfo = checkNotNull(connectionInfo); this.networkConfig = checkNotNull(networkConfig); this.timeout = checkNotNull(timeout); this.maxRegistrationDuration = maxRegistrationDuration; @@ -115,8 +108,6 @@ public class TaskExecutorConfiguration implements Serializable { return cleanupInterval; } - public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; } - public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } public FiniteDuration getTimeout() {