Repository: flink
Updated Branches:
  refs/heads/flip-6 1f3256b59 -> 17b83f11b


[hotfix] [taskmanager] Fixes TaskManager component creation at startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17b83f11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17b83f11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17b83f11

Branch: refs/heads/flip-6
Commit: 17b83f11be467d23458ba1d6299df8b1b1510664
Parents: 1f3256b
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Sep 8 18:43:15 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Sep 8 18:44:25 2016 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      | 189 ++++++++++++++++---
 .../taskexecutor/TaskExecutorConfiguration.java |   9 -
 2 files changed, 159 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17b83f11/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 735730b..a455fe2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,9 +19,19 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import akka.actor.ActorSystem;
-import akka.dispatch.ExecutionContexts$;
 import akka.util.Timeout;
 import com.typesafe.config.Config;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +47,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -61,7 +70,6 @@ import org.apache.flink.util.NetUtils;
 import scala.Tuple2;
 import scala.Option;
 import scala.Some;
-import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -70,9 +78,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.UUID;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -86,6 +94,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /** The unique resource ID of this TaskExecutor */
        private final ResourceID resourceID;
 
+       private final TaskManagerLocation taskManagerLocation;
+
        /** The access to the leader election and metadata storage services */
        private final HighAvailabilityServices haServices;
 
@@ -113,22 +123,26 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        public TaskExecutor(
                        TaskExecutorConfiguration taskExecutorConfig,
                        ResourceID resourceID,
+                       TaskManagerLocation taskManagerLocation,
                        MemoryManager memoryManager,
                        IOManager ioManager,
                        NetworkEnvironment networkEnvironment,
-                       int numberOfSlots,
                        RpcService rpcService,
                        HighAvailabilityServices haServices) {
 
                super(rpcService);
 
+               checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The 
number of slots has to be larger than 0.");
+
                this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
                this.resourceID = checkNotNull(resourceID);
+               this.taskManagerLocation = checkNotNull(taskManagerLocation);
                this.memoryManager = checkNotNull(memoryManager);
                this.ioManager = checkNotNull(ioManager);
                this.networkEnvironment = checkNotNull(networkEnvironment);
-               this.numberOfSlots = checkNotNull(numberOfSlots);
                this.haServices = checkNotNull(haServices);
+
+               this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
        }
 
        // 
------------------------------------------------------------------------
@@ -360,10 +374,10 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
         *                                      then a HighAvailabilityServices 
is constructed from the configuration.
         * @param localTaskManagerCommunication     If true, the TaskManager 
will not initiate the TCP network stack.
         * @return An ActorRef to the TaskManager actor.
-        * @throws org.apache.flink.configuration.IllegalConfigurationException 
    Thrown, if the given config contains illegal values.
-        * @throws java.io.IOException      Thrown, if any of the I/O 
components (such as buffer pools,
+        * @throws IllegalConfigurationException     Thrown, if the given 
config contains illegal values.
+        * @throws IOException      Thrown, if any of the I/O components (such 
as buffer pools,
         *                                       I/O manager, ...) cannot be 
properly started.
-        * @throws java.lang.Exception      Thrown is some other error occurs 
while parsing the configuration
+        * @throws Exception      Thrown is some other error occurs while 
parsing the configuration
         *                                      or starting the TaskManager 
components.
         */
        public static TaskExecutor startTaskManagerComponentsAndActor(
@@ -377,19 +391,105 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                final TaskExecutorConfiguration taskExecutorConfig = 
parseTaskManagerConfiguration(
                        configuration, taskManagerHostname, 
localTaskManagerCommunication);
 
+               TaskManagerComponents taskManagerComponents = 
createTaskManagerComponents(
+                       resourceID,
+                       InetAddress.getByName(taskManagerHostname),
+                       taskExecutorConfig,
+                       configuration);
+
+               final TaskExecutor taskExecutor = new TaskExecutor(
+                       taskExecutorConfig,
+                       resourceID,
+                       taskManagerComponents.getTaskManagerLocation(),
+                       taskManagerComponents.getMemoryManager(),
+                       taskManagerComponents.getIOManager(),
+                       taskManagerComponents.getNetworkEnvironment(),
+                       rpcService,
+                       haServices);
+
+               return taskExecutor;
+       }
+
+       /**
+        * Creates and returns the task manager components.
+        *
+        * @param resourceID resource ID of the task manager
+        * @param taskManagerAddress address of the task manager
+        * @param taskExecutorConfig task manager configuration
+        * @param configuration of Flink
+        * @return task manager components
+        * @throws Exception
+        */
+       private static TaskExecutor.TaskManagerComponents 
createTaskManagerComponents(
+               ResourceID resourceID,
+               InetAddress taskManagerAddress,
+               TaskExecutorConfiguration taskExecutorConfig,
+               Configuration configuration) throws Exception {
                MemoryType memType = 
taskExecutorConfig.getNetworkConfig().memoryType();
 
                // pre-start checks
                checkTempDirs(taskExecutorConfig.getTmpDirPaths());
 
-               ExecutionContext executionContext = 
ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+               NetworkEnvironmentConfiguration networkEnvironmentConfiguration 
= taskExecutorConfig.getNetworkConfig();
+
+               NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+                       networkEnvironmentConfiguration.numNetworkBuffers(),
+                       networkEnvironmentConfiguration.networkBufferSize(),
+                       networkEnvironmentConfiguration.memoryType());
+
+               ConnectionManager connectionManager;
+
+               if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+                       connectionManager = new 
NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
+               } else {
+                       connectionManager = new LocalConnectionManager();
+               }
+
+               ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
+
+               KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+               KvStateServer kvStateServer;
+
+               if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+                       NettyConfig nettyConfig = 
networkEnvironmentConfiguration.nettyConfig().get();
+
+                       int numNetworkThreads = 
networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+                               nettyConfig.getNumberOfSlots() : 
networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+                       int numQueryThreads = 
networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+                               nettyConfig.getNumberOfSlots() : 
networkEnvironmentConfiguration.queryServerQueryThreads();
+
+                       kvStateServer = new KvStateServer(
+                               taskManagerAddress,
+                               
networkEnvironmentConfiguration.queryServerPort(),
+                               numNetworkThreads,
+                               numQueryThreads,
+                               kvStateRegistry,
+                               new DisabledKvStateRequestStats());
+               } else {
+                       kvStateServer = null;
+               }
 
                // we start the network first, to make sure it can allocate its 
buffers first
                final NetworkEnvironment network = new NetworkEnvironment(
-                       executionContext,
-                       taskExecutorConfig.getTimeout(),
-                       taskExecutorConfig.getNetworkConfig(),
-                       taskExecutorConfig.getConnectionInfo());
+                       networkBufferPool,
+                       connectionManager,
+                       resultPartitionManager,
+                       taskEventDispatcher,
+                       kvStateRegistry,
+                       kvStateServer,
+                       networkEnvironmentConfiguration.ioMode(),
+                       
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+                       
networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+               network.start();
+
+               TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(
+                       resourceID,
+                       taskManagerAddress,
+                       network.getConnectionManager().getDataPort());
 
                // computing the amount of memory to use depends on how much 
memory is available
                // it strictly needs to happen AFTER the network stack has been 
initialized
@@ -473,17 +573,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                // start the I/O manager, it will create some temp directories.
                final IOManager ioManager = new 
IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
 
-               final TaskExecutor taskExecutor = new TaskExecutor(
-                       taskExecutorConfig,
-                       resourceID,
-                       memoryManager,
-                       ioManager,
-                       network,
-                       taskExecutorConfig.getNumberOfSlots(),
-                       rpcService,
-                       haServices);
-
-               return taskExecutor;
+               return new 
TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, 
ioManager, network);
        }
 
        // 
--------------------------------------------------------------------------
@@ -519,7 +609,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
 
                InetAddress taskManagerAddress = 
InetAddress.getByName(taskManagerHostname);
-               final InstanceConnectionInfo connectionInfo = new 
InstanceConnectionInfo(taskManagerAddress, dataport);
+               final InetSocketAddress taskManagerInetSocketAddress = new 
InetSocketAddress(taskManagerAddress, dataport);
 
                // ----> memory / network stack (shuffles/broadcasts), task 
slots, temp directories
 
@@ -576,7 +666,12 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                final NettyConfig nettyConfig;
                if (!localTaskManagerCommunication) {
-                       nettyConfig = new NettyConfig(connectionInfo.address(), 
connectionInfo.dataPort(), pageSize, slots, configuration);
+                       nettyConfig = new NettyConfig(
+                               taskManagerInetSocketAddress.getAddress(),
+                               taskManagerInetSocketAddress.getPort(),
+                               pageSize,
+                               slots,
+                               configuration);
                } else {
                        nettyConfig = null;
                }
@@ -613,8 +708,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        queryServerPort,
                        queryServerNetworkThreads,
                        queryServerQueryThreads,
-                       localTaskManagerCommunication ? 
Option.<NettyConfig>empty() : new Some<>(nettyConfig),
-                       new Tuple2<>(500, 3000));
+                       Option.apply(nettyConfig),
+                       500,
+                       30000);
 
                // ----> timeouts, library caching, profiling
 
@@ -695,7 +791,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                return new TaskExecutorConfiguration(
                        tmpDirs,
                        cleanupInterval,
-                       connectionInfo,
                        networkConfig,
                        timeout,
                        finiteRegistrationDuration,
@@ -829,4 +924,38 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        onFatalErrorAsync(exception);
                }
        }
+
+       private static class TaskManagerComponents {
+               private final TaskManagerLocation taskManagerLocation;
+               private final MemoryManager memoryManager;
+               private final IOManager ioManager;
+               private final NetworkEnvironment networkEnvironment;
+
+               private TaskManagerComponents(
+                               TaskManagerLocation taskManagerLocation,
+                               MemoryManager memoryManager,
+                               IOManager ioManager,
+                               NetworkEnvironment networkEnvironment) {
+                       this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
+                       this.memoryManager = 
Preconditions.checkNotNull(memoryManager);
+                       this.ioManager = Preconditions.checkNotNull(ioManager);
+                       this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
+               }
+
+               public MemoryManager getMemoryManager() {
+                       return memoryManager;
+               }
+
+               public IOManager getIOManager() {
+                       return ioManager;
+               }
+
+               public NetworkEnvironment getNetworkEnvironment() {
+                       return networkEnvironment;
+               }
+
+               public TaskManagerLocation getTaskManagerLocation() {
+                       return taskManagerLocation;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/17b83f11/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
index 3707a47..c97c893 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -52,12 +51,9 @@ public class TaskExecutorConfiguration implements 
Serializable {
 
        private final NetworkEnvironmentConfiguration networkConfig;
 
-       private final InstanceConnectionInfo connectionInfo;
-
        public TaskExecutorConfiguration(
                        String[] tmpDirPaths,
                        long cleanupInterval,
-                       InstanceConnectionInfo connectionInfo,
                        NetworkEnvironmentConfiguration networkConfig,
                        FiniteDuration timeout,
                        FiniteDuration maxRegistrationDuration,
@@ -66,7 +62,6 @@ public class TaskExecutorConfiguration implements 
Serializable {
 
                this (tmpDirPaths,
                        cleanupInterval,
-                       connectionInfo,
                        networkConfig,
                        timeout,
                        maxRegistrationDuration,
@@ -80,7 +75,6 @@ public class TaskExecutorConfiguration implements 
Serializable {
        public TaskExecutorConfiguration(
                        String[] tmpDirPaths,
                        long cleanupInterval,
-                       InstanceConnectionInfo connectionInfo,
                        NetworkEnvironmentConfiguration networkConfig,
                        FiniteDuration timeout,
                        FiniteDuration maxRegistrationDuration,
@@ -92,7 +86,6 @@ public class TaskExecutorConfiguration implements 
Serializable {
 
                this.tmpDirPaths = checkNotNull(tmpDirPaths);
                this.cleanupInterval = checkNotNull(cleanupInterval);
-               this.connectionInfo = checkNotNull(connectionInfo);
                this.networkConfig = checkNotNull(networkConfig);
                this.timeout = checkNotNull(timeout);
                this.maxRegistrationDuration = maxRegistrationDuration;
@@ -115,8 +108,6 @@ public class TaskExecutorConfiguration implements 
Serializable {
                return cleanupInterval;
        }
 
-       public InstanceConnectionInfo getConnectionInfo() { return 
connectionInfo; }
-
        public NetworkEnvironmentConfiguration getNetworkConfig() { return 
networkConfig; }
 
        public FiniteDuration getTimeout() {

Reply via email to