This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 652908519eed YARN-11588. [Federation] [Addendum] Fix uncleaned threads
in yarn router thread pool executor. (#6222)
652908519eed is described below
commit 652908519eed5fe79b696e97cc62f2014387be31
Author: slfan1989 <[email protected]>
AuthorDate: Fri Oct 27 04:39:06 2023 +0800
YARN-11588. [Federation] [Addendum] Fix uncleaned threads in yarn router
thread pool executor. (#6222)
---
.../org/apache/hadoop/yarn/conf/YarnConfiguration.java | 16 ++++++++++++++++
.../src/main/resources/yarn-default.xml | 17 +++++++++++++++++
.../router/clientrm/FederationClientInterceptor.java | 8 +++++++-
.../router/rmadmin/FederationRMAdminInterceptor.java | 14 +++++++++++++-
4 files changed, 53 insertions(+), 2 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 90a8978a228b..2a204519228a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4369,6 +4369,22 @@ public class YarnConfiguration extends Configuration {
public static final long
DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME =
TimeUnit.SECONDS.toMillis(0); // 0s
+ /**
+ * This method configures the policy for core threads regarding termination
+ * when no tasks arrive within the keep-alive time.
+ * When set to false, core threads are never terminated due to a lack of
tasks.
+ * When set to true, the same keep-alive policy
+ * that applies to non-core threads also applies to core threads.
+ * To prevent constant thread replacement,
+ * ensure that the keep-alive time is greater than zero when setting it to
true.
+ * It's advisable to call this method before the pool becomes actively used.
+ */
+ public static final String
ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
+ ROUTER_PREFIX +
"interceptor.user-thread-pool.allow-core-thread-time-out";
+
+ public static final boolean
DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
+ false;
+
/** The address of the Router web application. */
public static final String ROUTER_WEBAPP_ADDRESS =
ROUTER_WEBAPP_PREFIX + "address";
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9991e841d74b..72e8cc70f874 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5139,6 +5139,23 @@
</description>
</property>
+ <property>
+
<name>yarn.router.interceptor.user-thread-pool.allow-core-thread-time-out</name>
+ <value>false</value>
+ <description>
+ This method configures the policy for core threads regarding termination
+ when no tasks arrive within the keep-alive time.
+ When set to false, core threads are never terminated due to a lack of
tasks.
+ When set to true, the same keep-alive policy
+ that applies to non-core threads also applies to core threads.
+ To prevent constant thread replacement,
+ ensure that the keep-alive time is greater than zero when setting it to
true.
+ It's advisable to call this method before the pool becomes actively used.
+ We need to ensure that
+ yarn.router.interceptor.user-thread-pool.keep-alive-time is greater
than 0.
+ </description>
+ </property>
+
<property>
<name>yarn.router.submit.interval.time</name>
<value>10ms</value>
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 9c3f9971d8c7..35b3e6eeb2bd 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -231,7 +231,13 @@ public class FederationClientInterceptor
keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);
// Adding this line so that unused user threads will exit and be cleaned
up if idle for too long
- this.executorService.allowCoreThreadTimeOut(true);
+ boolean allowCoreThreadTimeOut = getConf().getBoolean(
+
YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+
+ if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
+ this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+ }
final Configuration conf = this.getConf();
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
index b7c1462a60d5..d269cfe0971c 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
@@ -130,9 +130,21 @@ public class FederationRMAdminInterceptor extends
AbstractRMAdminRequestIntercep
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router RMAdminClient-" + userName + "-%d
").build();
+ long keepAliveTime = getConf().getTimeDuration(
+ YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME,
+
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME,
TimeUnit.SECONDS);
+
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
- 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+ keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+
+ boolean allowCoreThreadTimeOut = getConf().getBoolean(
+
YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+
+ if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
+ this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+ }
federationFacade = FederationStateStoreFacade.getInstance(this.getConf());
this.conf = this.getConf();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]