[
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430420#comment-15430420
]
ASF GitHub Bot commented on FLINK-4363:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2400#discussion_r75645973
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
---
@@ -35,27 +79,634 @@
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
- /** The unique resource ID of this TaskExecutor */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutor.class);
+
+ /** Return code for critical errors during the runtime */
+ private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+ /** The name of the TaskManager actor */
+ private static final String TASK_MANAGER_NAME = "taskmanager";
+
+ /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
- // --------- resource manager --------
+ /** The task manager configuration */
+ private final TaskManagerConfiguration taskManagerConfig;
+
+ /** The connection information of the task manager */
+ private final InstanceConnectionInfo connectionInfo;
+
+ /** The I/O manager component in the task manager */
+ private final IOManager ioManager;
+
+ /** The memory manager component in the task manager */
+ private final MemoryManager memoryManager;
+
+ /** The network component in the task manager */
+ private final NetworkEnvironment networkEnvironment;
+
+ /** The number of slots in the task manager, should be 1 for YARN */
+ private final int numberOfSlots;
+ // --------- resource manager --------
private TaskExecutorToResourceManagerConnection
resourceManagerConnection;
//
------------------------------------------------------------------------
public TaskExecutor(
+ TaskManagerConfiguration taskManagerConfig,
+ ResourceID resourceID,
+ InstanceConnectionInfo connectionInfo,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ int numberOfSlots,
RpcService rpcService,
- HighAvailabilityServices haServices,
- ResourceID resourceID) {
+ HighAvailabilityServices haServices) {
super(rpcService);
- this.haServices = checkNotNull(haServices);
+ this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+ this.connectionInfo = checkNotNull(connectionInfo);
+ this.memoryManager = checkNotNull(memoryManager);
+ this.ioManager = checkNotNull(ioManager);
+ this.networkEnvironment = checkNotNull(networkEnvironment);
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.haServices = checkNotNull(haServices);
+ }
+
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for
the TaskManager
+ * communication. The network interface is used both for the actor
communication
+ * (coordination) as well as for the data exchange between task
managers. Unless
+ * the hostname/interface is explicitly configured in the
configuration, this
+ * method will try out various interfaces and methods to connect to the
JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an
actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the
TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param taskManagerClass The actor class to instantiate.
+ * Allows to use TaskManager subclasses for
example for YARN.
+ */
+ public static void selectNetworkInterfaceAndRunTaskManager(
+ Configuration configuration,
+ ResourceID resourceID,
+ Class<? extends TaskManager> taskManagerClass) throws Exception
{
+
+ Tuple2<String, Integer> tuple2 =
selectNetworkInterfaceAndPort(configuration);
--- End diff --
Do we want to rely on Scala tuples here after the conversion from Scala to
Java? I would suggest `InetAddress` here.
> Implement TaskManager basic startup of all components in java
> -------------------------------------------------------------
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Zhijiang Wang
> Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup
> all components in java instead of scala.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)