Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2461#discussion_r77329068
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
 ---
    @@ -0,0 +1,198 @@
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.taskmanager.MemoryLogger;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import akka.actor.ActorSystem;
    +import akka.util.Timeout;
    +
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import com.typesafe.config.Config;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * An factory for creating {@link TaskExecutor} and starting it in yarn 
mode.
    + */
    +public class YarnTaskExecutorFactory extends TaskExecutorFactory {
    +
    +   public YarnTaskExecutorFactory(Configuration configuration, ResourceID 
resourceID) {
    +           super(configuration, resourceID);
    +   }
    +
    +   @Override
    +   public TaskExecutor createAndStartTaskExecutor() throws Exception {
    +           return selectNetworkInterfaceAndRunTaskManager(configuration, 
resourceID);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager.
    +    * <p/>
    +    * This method first tries to select the network interface to use for 
the TaskManager
    +    * communication. The network interface is used both for the actor 
communication
    +    * (coordination) as well as for the data exchange between task 
managers. Unless
    +    * the hostname/interface is explicitly configured in the 
configuration, this
    +    * method will try out various interfaces and methods to connect to the 
JobManager
    +    * and select the one where the connection attempt is successful.
    +    * <p/>
    +    * After selecting the network interface, this method brings up an 
actor system
    +    * for the TaskManager and its actors, starts the TaskManager's services
    +    * (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
    +    *
    +    * @param configuration    The configuration for the TaskManager.
    +    * @param resourceID       The id of the resource which the task 
manager will run on.
    +    */
    +   private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
    --- End diff --
    
    The parameters provided from different modes are different as I referred to 
previous implementation for **TaskManager**. For example, 
**YarnTaskManagerRunner** invokes the method 
"selectNetworkInterfaceAndRunTaskManager", and **ForkableFlinkMiniCluster** 
invokes the method "startTaskManagerComponentsAndActor" to start 
**TaskManager** before. So I retained the previous ways for different modes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to