Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2400#discussion_r76084398
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
---
@@ -36,27 +82,617 @@
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
- /** The unique resource ID of this TaskExecutor */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutor.class);
+
+ /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
- // --------- resource manager --------
+ /** The task manager configuration */
+ private final TaskExecutorConfiguration taskExecutorConfig;
+
+ /** The connection information of the task manager */
+ private final InstanceConnectionInfo connectionInfo;
+
+ /** The I/O manager component in the task manager */
+ private final IOManager ioManager;
+
+ /** The memory manager component in the task manager */
+ private final MemoryManager memoryManager;
+
+ /** The network component in the task manager */
+ private final NetworkEnvironment networkEnvironment;
+
+ /** The number of slots in the task manager, should be 1 for YARN */
+ private final int numberOfSlots;
+ // --------- resource manager --------
private TaskExecutorToResourceManagerConnection
resourceManagerConnection;
//
------------------------------------------------------------------------
public TaskExecutor(
+ TaskExecutorConfiguration taskExecutorConfig,
+ ResourceID resourceID,
+ InstanceConnectionInfo connectionInfo,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ int numberOfSlots,
RpcService rpcService,
- HighAvailabilityServices haServices,
- ResourceID resourceID) {
+ HighAvailabilityServices haServices) {
super(rpcService);
- this.haServices = checkNotNull(haServices);
+ this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+ this.connectionInfo = checkNotNull(connectionInfo);
+ this.memoryManager = checkNotNull(memoryManager);
+ this.ioManager = checkNotNull(ioManager);
+ this.networkEnvironment = checkNotNull(networkEnvironment);
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.haServices = checkNotNull(haServices);
+ }
+
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for
the TaskManager
+ * communication. The network interface is used both for the actor
communication
+ * (coordination) as well as for the data exchange between task
managers. Unless
+ * the hostname/interface is explicitly configured in the
configuration, this
+ * method will try out various interfaces and methods to connect to the
JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an
actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the
TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task
manager will run on.
+ */
+ public static void selectNetworkInterfaceAndRunTaskManager(
+ Configuration configuration,
+ ResourceID resourceID) throws Exception {
+
+ InetSocketAddress taskManagerAddress =
selectNetworkInterfaceAndPort(configuration);
+
+ runTaskManager(taskManagerAddress.getHostName(), resourceID,
taskManagerAddress.getPort(), configuration);
+ }
+
+ private static InetSocketAddress
selectNetworkInterfaceAndPort(Configuration configuration)
+ throws Exception {
+ String taskManagerHostname =
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for
TaskManager: " + taskManagerHostname);
+ } else {
+ LeaderRetrievalService leaderRetrievalService =
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ FiniteDuration lookupTimeout =
AkkaUtils.getLookupTimeout(configuration);
+
+ InetAddress taskManagerAddress =
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService,
lookupTimeout);
+ taskManagerHostname = taskManagerAddress.getHostName();
+ LOG.info("TaskManager will use hostname/address '{}'
({}) for communication.",
+ taskManagerHostname,
taskManagerAddress.getHostAddress());
+ }
+
+ // if no task manager port has been configured, use 0 (system
will pick any free port)
+ int actorSystemPort =
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ if (actorSystemPort < 0 || actorSystemPort > 65535) {
+ throw new IllegalConfigurationException("Invalid value
for '" +
+ ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+ "' (port for the TaskManager actor system) : "
+ actorSystemPort +
+ " - Leave config parameter empty or use 0 to
let the system choose a port automatically.");
+ }
+
+ return new InetSocketAddress(taskManagerHostname,
actorSystemPort);
+ }
+
+ /**
+ * Starts and runs the TaskManager. Brings up an actor system for the
TaskManager and its
+ * actors, starts the TaskManager's services (library cache, shuffle
network stack, ...),
+ * and starts the TaskManager itself.
+ * <p/>
+ * This method will also spawn a process reaper for the TaskManager
(kill the process if
+ * the actor fails) and optionally start the JVM memory logging thread.
+ *
+ * @param taskManagerHostname The hostname/address of the interface
where the actor system
+ * will communicate.
+ * @param resourceID The id of the resource which the task
manager will run on.
+ * @param actorSystemPort The port at which the actor system will
communicate.
+ * @param configuration The configuration for the TaskManager.
+ */
+ private static void runTaskManager(
+ String taskManagerHostname,
+ ResourceID resourceID,
+ int actorSystemPort,
+ final Configuration configuration) throws Exception {
+
+ LOG.info("Starting TaskManager");
+
+ // Bring up the TaskManager actor system first, bind it to the
given address.
+
+ LOG.info("Starting TaskManager actor system at " +
+ NetUtils.hostAndPortToUrlString(taskManagerHostname,
actorSystemPort));
+
+ final ActorSystem taskManagerSystem;
+ try {
+ Tuple2<String, Object> address = new Tuple2<String,
Object>(taskManagerHostname, actorSystemPort);
+ Config akkaConfig =
AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
+ LOG.debug("Using akka configuration\n " + akkaConfig);
+ taskManagerSystem =
AkkaUtils.createActorSystem(akkaConfig);
+ } catch (Throwable t) {
+ if (t instanceof
org.jboss.netty.channel.ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof
java.net.BindException) {
+ String address =
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
+ throw new IOException("Unable to bind
TaskManager actor system to address " +
+ address + " - " +
cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create TaskManager actor
system", t);
+ }
+
+ // start akka rpc service based on actor system
+ Timeout timeout = new
Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+ AkkaRpcService akkaRpcService = new
AkkaRpcService(taskManagerSystem, timeout);
+
+ // start high availability service to implement
getResourceManagerLeaderRetriever method only
+ HighAvailabilityServices haServices = new
HighAvailabilityServices() {
+ @Override
+ public LeaderRetrievalService
getResourceManagerLeaderRetriever() throws Exception {
+ return
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ }
+
+ @Override
+ public LeaderElectionService
getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+ return null;
+ }
+ };
+
+ // start all the TaskManager services (network stack, library
cache, ...)
+ // and the TaskManager actor
+ try {
+ LOG.info("Starting TaskManager actor");
+ TaskExecutor taskExecutor =
startTaskManagerComponentsAndActor(
+ configuration,
+ resourceID,
+ akkaRpcService,
+ taskManagerHostname,
+ haServices,
+ false);
+
+ taskExecutor.start();
+
+ // if desired, start the logging daemon that
periodically logs the memory usage information
+ if (LOG.isInfoEnabled() && configuration.getBoolean(
+
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
+
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
+ LOG.info("Starting periodic memory usage
logger");
+
+ long interval = configuration.getLong(
+
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
+
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
+
+ MemoryLogger logger = new MemoryLogger(LOG,
interval, taskManagerSystem);
+ logger.start();
+ }
+
+ // block until everything is done
+ taskManagerSystem.awaitTermination();
+ } catch (Throwable t) {
+ LOG.error("Error while starting up taskManager", t);
+ try {
+ taskManagerSystem.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Could not cleanly shut down actor
system", tt);
+ }
+ throw t;
+ }
+ }
+
+ //
--------------------------------------------------------------------------
+ // Starting and running the TaskManager
+ //
--------------------------------------------------------------------------
+
+ /**
+ * @param configuration The configuration for the
TaskManager.
+ * @param resourceID The id of the resource which
the task manager will run on.
+ * @param rpcService The rpc service which is used to
start and connect to the TaskManager RpcEndpoint .
+ * @param taskManagerHostname The hostname/address that describes
the TaskManager's data location.
+ * @param haServices Optionally, a high availability service can
be provided. If none is given,
+ * then a HighAvailabilityServices
is constructed from the configuration.
+ * @param localTaskManagerCommunication If true, the TaskManager will
not initiate the TCP network stack.
+ * @return An ActorRef to the TaskManager actor.
+ * @throws org.apache.flink.configuration.IllegalConfigurationException
Thrown, if the given config contains illegal values.
+ * @throws java.io.IOException Thrown, if any of the I/O
components (such as buffer pools,
+ * I/O manager, ...) cannot be
properly started.
+ * @throws java.lang.Exception Thrown is some other error occurs
while parsing the configuration
+ * or starting the TaskManager
components.
+ */
+ public static TaskExecutor startTaskManagerComponentsAndActor(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ String taskManagerHostname,
+ HighAvailabilityServices haServices,
+ boolean localTaskManagerCommunication) throws Exception {
+
+ Tuple4<TaskExecutorConfiguration,
NetworkEnvironmentConfiguration, InstanceConnectionInfo, MemoryType> tuple4
+ = parseTaskManagerConfiguration(configuration,
taskManagerHostname, localTaskManagerCommunication);
+
+ TaskExecutorConfiguration taskExecutorConfig = tuple4.f0;
+ NetworkEnvironmentConfiguration netConfig = tuple4.f1;
+ InstanceConnectionInfo connectionInfo = tuple4.f2;
+ MemoryType memType = tuple4.f3;
+
+ // pre-start checks
+ checkTempDirs(taskExecutorConfig.getTmpDirPaths());
+
+ ExecutionContext executionContext =
ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+
+ // we start the network first, to make sure it can allocate its
buffers first
+ NetworkEnvironment network = new
NetworkEnvironment(executionContext, taskExecutorConfig.getTimeout(),
netConfig);
+
+ // computing the amount of memory to use depends on how much
memory is available
+ // it strictly needs to happen AFTER the network stack has been
initialized
+
+ // check if a value has been configured
+ long configuredMemory =
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ checkConfigParameter(configuredMemory == -1 || configuredMemory
> 0, configuredMemory,
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the
system automatically " +
+ "pick a fraction of the available memory.");
+
+ long memorySize;
+ boolean preAllocateMemory = configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+ if (configuredMemory > 0) {
+ if (preAllocateMemory) {
+ LOG.info("Using {} MB for managed memory." ,
configuredMemory);
+ } else {
+ LOG.info("Limiting managed memory to {} MB,
memory will be allocated lazily." , configuredMemory);
+ }
+ memorySize = configuredMemory << 20; // megabytes to
bytes
+ } else {
+ float fraction = configuration.getFloat(
+
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+ checkConfigParameter(fraction > 0.0f && fraction <
1.0f, fraction,
+
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ "MemoryManager fraction of the free memory must
be between 0.0 and 1.0");
+
+ if (memType == MemoryType.HEAP) {
+ long relativeMemSize = (long)
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the currently
free heap space for managed heap memory ({} MB)." ,
+ fraction , relativeMemSize >>
20);
+ } else {
+ LOG.info("Limiting managed memory to {}
of the currently free heap space ({} MB), " +
+ "memory will be allocated
lazily." , fraction , relativeMemSize >> 20);
+ }
+ memorySize = relativeMemSize;
+ } else if (memType == MemoryType.OFF_HEAP) {
+ // The maximum heap memory has been adjusted
according to the fraction
+ long maxMemory =
EnvironmentInformation.getMaxJvmHeapMemory();
+ long directMemorySize = (long) (maxMemory /
(1.0 - fraction) * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the maximum
memory size for managed off-heap memory ({} MB)." ,
+ fraction, directMemorySize >>
20);
+ } else {
+ LOG.info("Limiting managed memory to {}
of the maximum memory size ({} MB)," +
+ " memory will be allocated
lazily.", fraction, directMemorySize >> 20);
+ }
+ memorySize = directMemorySize;
+ } else {
+ throw new RuntimeException("No supported memory
type detected.");
+ }
+ }
+
+ // now start the memory manager
+ final MemoryManager memoryManager;
+ try {
+ memoryManager = new MemoryManager(
+ memorySize,
+ taskExecutorConfig.getNumberOfSlots(),
+ netConfig.networkBufferSize(),
+ memType,
+ preAllocateMemory);
+ } catch (OutOfMemoryError e) {
+ if (memType == MemoryType.HEAP) {
+ throw new Exception("OutOfMemory error (" +
e.getMessage() +
+ ") while allocating the TaskManager
heap memory (" + memorySize + " bytes).", e);
+ } else if (memType == MemoryType.OFF_HEAP) {
+ throw new Exception("OutOfMemory error (" +
e.getMessage() +
+ ") while allocating the TaskManager
off-heap memory (" + memorySize +
+ " bytes).Try increasing the maximum
direct memory (-XX:MaxDirectMemorySize)", e);
+ } else {
+ throw e;
+ }
+ }
+
+ // start the I/O manager, it will create some temp directories.
+ IOManager ioManager = new
IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
+
+ TaskExecutor taskExecutor = new TaskExecutor(
+ taskExecutorConfig,
+ resourceID,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ taskExecutorConfig.getNumberOfSlots(),
+ rpcService,
+ haServices);
+
+ return taskExecutor;
+ }
+
+ //
--------------------------------------------------------------------------
+ // Parsing and checking the TaskManager Configuration
+ //
--------------------------------------------------------------------------
+
+ /**
+ * Utility method to extract TaskManager config parameters from the
configuration and to
+ * sanity check them.
+ *
+ * @param configuration The configuration.
+ * @param taskManagerHostname The host name under which the
TaskManager communicates.
+ * @param localTaskManagerCommunication True, to skip initializing the
network stack.
+ * Use only in cases where only
one task manager runs.
+ * @return A tuple (TaskManagerConfiguration, network configuration,
+ * InstanceConnectionInfo, JobManager actor Akka URL).
+ */
+ private static Tuple4<TaskExecutorConfiguration,
NetworkEnvironmentConfiguration, InstanceConnectionInfo, MemoryType>
+ parseTaskManagerConfiguration(Configuration configuration,
String taskManagerHostname, boolean localTaskManagerCommunication)
+ throws Exception {
+
+ // ------- read values from the config and check them ---------
+ // (a lot of them)
+
+ // ----> hosts / ports for communication and data exchange
+
+ int dataport =
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+ if (dataport == 0) {
+ dataport = NetUtils.getAvailablePort();
+ }
+ checkConfigParameter(dataport > 0, dataport,
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ "Leave config parameter empty or use 0 to let the
system choose a port automatically.");
+
+ InetAddress taskManagerAddress =
InetAddress.getByName(taskManagerHostname);
+ InstanceConnectionInfo connectionInfo = new
InstanceConnectionInfo(taskManagerAddress, dataport);
+
+ // ----> memory / network stack (shuffles/broadcasts), task
slots, temp directories
+
+ // we need this because many configs have been written with a
"-1" entry
+ int slots =
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ if (slots == -1) {
+ slots = 1;
+ }
+ checkConfigParameter(slots >= 1, slots,
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ "Number of task slots must be at least one.");
+
+ int numNetworkBuffers = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+ checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
"");
+
+ int pageSize = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+ // check page size of for minimum size
+ checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE,
pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Minimum memory segment size is " +
MemoryManager.MIN_PAGE_SIZE);
+ // check page size for power of two
+ checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Memory segment size must be a power of 2.");
+
+ // check whether we use heap or off-heap memory
+ final MemoryType memType;
+ if
(configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY,
false)) {
+ memType = MemoryType.OFF_HEAP;
+ } else {
+ memType = MemoryType.HEAP;
+ }
+
+ // initialize the memory segment factory accordingly
+ if (memType == MemoryType.HEAP) {
+ if (!MemorySegmentFactory.isInitialized()) {
+
MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY);
+ } else if (MemorySegmentFactory.getFactory() !=
HeapMemorySegment.FACTORY) {
+ throw new Exception("Memory type is set to heap
memory, but memory segment " +
+ "factory has been initialized for
off-heap memory segments");
+ }
+ } else {
+ if (!MemorySegmentFactory.isInitialized()) {
+
MemorySegmentFactory.initializeFactory(HybridMemorySegment.FACTORY);
+ } else if (MemorySegmentFactory.getFactory() !=
HybridMemorySegment.FACTORY) {
+ throw new Exception("Memory type is set to
off-heap memory, but memory segment " +
+ "factory has been initialized for heap
memory segments");
+ }
+ }
+
+ String[] tmpDirs = configuration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+ final NettyConfig nettyConfig;
+ if (!localTaskManagerCommunication) {
+ nettyConfig = new NettyConfig(connectionInfo.address(),
connectionInfo.dataPort(), pageSize, slots, configuration);
+ } else {
+ nettyConfig = null;
+ }
+
+ // Default spill I/O mode for intermediate results
+ String syncOrAsync = configuration.getString(
+ ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+ final IOMode ioMode;
+ if (syncOrAsync == "async") {
--- End diff --
This doesn't work in Java. You need to use `equals(..)`. Or in this case
you can use `syncOrAsync == IOMode.ASYNC`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---