Github user wangzhijiang999 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2400#discussion_r75649383
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
---
@@ -35,27 +79,634 @@
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
- /** The unique resource ID of this TaskExecutor */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutor.class);
+
+ /** Return code for critical errors during the runtime */
+ private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+ /** The name of the TaskManager actor */
+ private static final String TASK_MANAGER_NAME = "taskmanager";
+
+ /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
- // --------- resource manager --------
+ /** The task manager configuration */
+ private final TaskManagerConfiguration taskManagerConfig;
+
+ /** The connection information of the task manager */
+ private final InstanceConnectionInfo connectionInfo;
+
+ /** The I/O manager component in the task manager */
+ private final IOManager ioManager;
+
+ /** The memory manager component in the task manager */
+ private final MemoryManager memoryManager;
+
+ /** The network component in the task manager */
+ private final NetworkEnvironment networkEnvironment;
+
+ /** The number of slots in the task manager, should be 1 for YARN */
+ private final int numberOfSlots;
+ // --------- resource manager --------
private TaskExecutorToResourceManagerConnection
resourceManagerConnection;
//
------------------------------------------------------------------------
public TaskExecutor(
+ TaskManagerConfiguration taskManagerConfig,
+ ResourceID resourceID,
+ InstanceConnectionInfo connectionInfo,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ int numberOfSlots,
RpcService rpcService,
- HighAvailabilityServices haServices,
- ResourceID resourceID) {
+ HighAvailabilityServices haServices) {
super(rpcService);
- this.haServices = checkNotNull(haServices);
+ this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+ this.connectionInfo = checkNotNull(connectionInfo);
+ this.memoryManager = checkNotNull(memoryManager);
+ this.ioManager = checkNotNull(ioManager);
+ this.networkEnvironment = checkNotNull(networkEnvironment);
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.haServices = checkNotNull(haServices);
+ }
+
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for
the TaskManager
+ * communication. The network interface is used both for the actor
communication
+ * (coordination) as well as for the data exchange between task
managers. Unless
+ * the hostname/interface is explicitly configured in the
configuration, this
+ * method will try out various interfaces and methods to connect to the
JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an
actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the
TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param taskManagerClass The actor class to instantiate.
+ * Allows to use TaskManager subclasses for
example for YARN.
+ */
+ public static void selectNetworkInterfaceAndRunTaskManager(
+ Configuration configuration,
+ ResourceID resourceID,
+ Class<? extends TaskManager> taskManagerClass) throws Exception
{
+
+ Tuple2<String, Integer> tuple2 =
selectNetworkInterfaceAndPort(configuration);
+
+ runTaskManager(tuple2._1(), resourceID, tuple2._2(),
configuration, taskManagerClass);
+ }
+
+ private static Tuple2<String, Integer>
selectNetworkInterfaceAndPort(Configuration configuration)
--- End diff --
It is better to use InetAddress instead of scala tupelo
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---