This is an automated email from the ASF dual-hosted git repository.
sergeykamov pushed a commit to branch NLPCRAFT-111
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git
The following commit(s) were added to refs/heads/NLPCRAFT-111 by this push:
new 8f09fef WIP.
8f09fef is described below
commit 8f09fef41505f7d6f9f9a8208e9e0b206a0ac997
Author: Sergey Kamov <[email protected]>
AuthorDate: Sun Jan 24 12:44:33 2021 +0300
WIP.
---
nlpcraft/src/main/resources/nlpcraft.conf | 18 ++----
.../nlpcraft/common/pool/NCThreadPoolManager.scala | 65 ++++++++--------------
2 files changed, 28 insertions(+), 55 deletions(-)
diff --git a/nlpcraft/src/main/resources/nlpcraft.conf
b/nlpcraft/src/main/resources/nlpcraft.conf
index ed042cf..4d85d8e 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -208,13 +208,10 @@ nlpcraft {
# 'ctxword' server endpoint URL.
ctxword.url="http://localhost:5000"
- # TODO: some description (keep these names for consistemt with
ThreadPoolExecutor parameters names)
+ # TODO: some description (pool name - max thread size)
+ # min 0, keepAliveTime = 1 min
# pools = {
- # "org.apache.nlpcraft.server.rest.NCBasicRestApi" = {
- # "corePoolSize" = 2
- # "maximumPoolSize" = 4
- # "keepAliveTime" = 1000
- # }
+ # "org.apache.nlpcraft.common.extcfg.NCExternalConfigManager" = 10
# }
}
@@ -334,13 +331,10 @@ nlpcraft {
# When exceeded the request will be automatically rejected.
resultMaxSizeBytes = 1048576
- # TODO: some description (keep these names for consistemt with
ThreadPoolExecutor parameters names)
+ # TODO: some description (pool name - max thread size)
+ # min 0, keepAliveTime = 1 min
# pools = {
- # "org.apache.nlpcraft.common.extcfg.NCExternalConfigManager" = {
- # "corePoolSize" = 2.2
- # "maximumPoolSize" = 4
- # "keepAliveTime" = 1000
- # }
+ # "org.apache.nlpcraft.common.extcfg.NCExternalConfigManager" = 10
# }
}
}
diff --git
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
index b8ffbba..dee7f6d 100644
---
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
+++
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
@@ -23,7 +23,6 @@ import org.apache.nlpcraft.common.module.NCModule
import org.apache.nlpcraft.common.module.NCModule._
import org.apache.nlpcraft.common.{NCE, NCService, U}
-import java.util
import java.util.concurrent._
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
@@ -32,19 +31,19 @@ import scala.concurrent.ExecutionContext
* Common thread pool manager.
*/
object NCThreadPoolManager extends NCService {
+ private final val KEEP_ALIVE_MS = 60000
+
@volatile private var data: ConcurrentHashMap[String, Holder] = new
ConcurrentHashMap
private case class Holder(context: ExecutionContext, pool:
Option[ExecutorService])
- // Names are same as parameter names of 'ThreadPoolExecutor'.
- private case class PoolCfg(corePoolSize: Int, maximumPoolSize: Int,
keepAliveTime: Long)
private object Config extends NCConfigurable {
private val module: NCModule = NCModule.getModule
val moduleName: String = module.toString.toLowerCase
- val factories: Map[String, PoolCfg] = {
- val m: Option[Map[String, util.HashMap[String, Number]]] =
+ val sizes: Map[String, Integer] = {
+ val m: Option[Map[String, Integer]] =
getMapOpt(
module match {
case SERVER ⇒ "nlpcraft.server.pools"
@@ -54,60 +53,40 @@ object NCThreadPoolManager extends NCService {
}
)
- m.getOrElse(Map.empty).map { case (name, cfg) ⇒
- val cfgMap = cfg.asScala
+ m.getOrElse(Map.empty)
+ }
- def get(prop: String): Number = {
- try
- cfgMap.getOrElse(prop, throw new NCE(s"Missing
property value '$prop' for thread pool: $name"))
- catch {
- case e: ClassCastException ⇒ throw new NCE(s"Invalid
property value '$prop' for thread pool: $name", e)
- }
- }
+ @throws[NCE]
+ def check(): Unit = {
+ val inv = sizes.filter(_._2 <= 0)
- name →
- PoolCfg(
- get("corePoolSize").intValue(),
- get("maximumPoolSize").intValue(),
- get("keepAliveTime").longValue()
- )
- }
+ if (inv.nonEmpty)
+ throw new NCE(s"Invalid pool maximum sizes for:
[${inv.keys.mkString(", ")}]")
}
}
+ Config.check()
+
def getContext(name: String): ExecutionContext =
data.computeIfAbsent(
name,
(_: String) ⇒
- Config.factories.get(name) match {
- case Some(pCfg) ⇒
- val p = new ThreadPoolExecutor(
- pCfg.corePoolSize,
- pCfg.maximumPoolSize,
- pCfg.keepAliveTime,
+ Config.sizes.get(name) match {
+ case Some(maxSize) ⇒
+ val ex = new ThreadPoolExecutor(
+ 0,
+ maxSize,
+ KEEP_ALIVE_MS,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable]
)
- logger.info(
- s"Custom executor service created [ " +
- s"name=$name, " +
- s"module=${Config.moduleName}, " +
- s"corePoolSize=${pCfg.corePoolSize}, " +
- s"maximumPoolSize=${pCfg.maximumPoolSize}, " +
- s"keepAliveTime=${pCfg.keepAliveTime}" +
- "]"
- )
+ logger.info(s"Custom executor service created for
'$name' with maxThreadSize: $maxSize.")
- Holder(ExecutionContext.fromExecutor(p), Some(p))
+ Holder(ExecutionContext.fromExecutor(ex), Some(ex))
case None ⇒
- logger.info(
- s"Default system executor service used [ " +
- s"name=$name, " +
- s"module=${Config.moduleName}" +
- "]"
- )
+ logger.info(s"Default system executor service used for
'$name'")
Holder(ExecutionContext.Implicits.global, None)
}