This is an automated email from the ASF dual-hosted git repository.
sergeykamov pushed a commit to branch NLPCRAFT-111
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git
The following commit(s) were added to refs/heads/NLPCRAFT-111 by this push:
new 8672bab WIP.
8672bab is described below
commit 8672bab8981f8894a3ed1c8203d764d6c1d48cf9
Author: Sergey Kamov <[email protected]>
AuthorDate: Thu Jan 21 23:09:44 2021 +0300
WIP.
---
nlpcraft/src/main/resources/nlpcraft.conf | 18 ++++++++-
.../common/extcfg/NCExternalConfigManager.scala | 30 +++++++++++---
.../apache/nlpcraft/common/pool/NCPoolFactory.java | 7 ----
.../nlpcraft/common/pool/NCPoolManager.scala | 47 +++++++++++++++++-----
4 files changed, 78 insertions(+), 24 deletions(-)
diff --git a/nlpcraft/src/main/resources/nlpcraft.conf
b/nlpcraft/src/main/resources/nlpcraft.conf
index b67ad76..b60ed14 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -208,7 +208,14 @@ nlpcraft {
# 'ctxword' server endpoint URL.
ctxword.url="http://localhost:5000"
- # pools =
+ # TODO: some description
+ # pools = {
+ # "org.apache.nlpcraft.server.rest.NCBasicRestApi" = {
+ # "corePoolSize" = 2
+ # "maximumPoolSize" = 4
+ # "keepAliveTime" = 1000
+ # }
+ # }
}
# Basic NLP toolkit to use on both server and probes. Possible values:
@@ -327,6 +334,13 @@ nlpcraft {
# When exceeded the request will be automatically rejected.
resultMaxSizeBytes = 1048576
- # pools =
+ # TODO: some description
+ # pools = {
+ # "org.apache.nlpcraft.common.extcfg.NCExternalConfigManager" = {
+ # "corePoolSize" = 2
+ # "maximumPoolSize" = 4
+ # "keepAliveTime" = 1000
+ # }
+ # }
}
}
diff --git
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
index f34eb81..d56c80d 100644
---
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
+++
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
@@ -21,22 +21,24 @@ import java.io._
import java.net.URL
import java.nio.file.Files
import java.util.concurrent.ConcurrentHashMap
-
import io.opencensus.trace.Span
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.io.IOUtils
import org.apache.nlpcraft.common.config.NCConfigurable
import org.apache.nlpcraft.common.extcfg.NCExternalConfigType._
+import org.apache.nlpcraft.common.module.NCModule
+import org.apache.nlpcraft.common.module.NCModule.{PROBE, SERVER}
+import org.apache.nlpcraft.common.pool.NCPoolContext
import org.apache.nlpcraft.common.{NCE, NCService, U}
import resource.managed
+
import scala.collection.JavaConverters._
import scala.io.Source
-import scala.concurrent.ExecutionContext.Implicits.global
/**
* External configuration manager.
*/
-object NCExternalConfigManager extends NCService {
+object NCExternalConfigManager extends NCService with NCPoolContext {
private final val DFLT_DIR = ".nlpcraft/extcfg"
private final val MD5_FILE = "md5.txt"
@@ -65,6 +67,14 @@ object NCExternalConfigManager extends NCService {
)
)
+ private val FILES_BY_MODULES =
+ Map(
+ GEO → Seq(SERVER),
+ BADFILTER → Seq(SERVER, PROBE),
+ SPELL → Seq(SERVER),
+ OPENNLP → Seq(SERVER, PROBE)
+ )
+
private object Config extends NCConfigurable {
val url: String = getString("nlpcraft.extConfig.extUrl")
val checkMd5: Boolean = getBool("nlpcraft.extConfig.checkMd5")
@@ -151,12 +161,21 @@ object NCExternalConfigManager extends NCService {
override def start(parent: Span): NCService = startScopedSpan("start",
parent) { _ ⇒
ackStarting()
+ require(FILES.size == FILES_BY_MODULES.size)
require(NCExternalConfigType.values.forall(FILES.contains))
+ val module = NCModule.getModule
+
+ val moduleFiles = FILES.filter(p ⇒
FILES_BY_MODULES(p._1).contains(module))
+
val m = new ConcurrentHashMap[NCResourceType, File]
U.executeParallel(
- NCExternalConfigType.values.flatMap(t ⇒ FILES(t).map(FileHolder(_,
t))).toSeq.map(f ⇒ () ⇒ processFile(f, m)): _*
+ moduleFiles.keys.
+ flatMap(t ⇒ moduleFiles(t).
+ map(FileHolder(_, t))).
+ toSeq.
+ map(f ⇒ () ⇒ processFile(f, m)): _*
)
val downTypes = m.asScala
@@ -166,7 +185,8 @@ object NCExternalConfigManager extends NCService {
downTypes.values.toSeq.map(d ⇒ () ⇒ clearDir(d)): _*
)
U.executeParallel(
- downTypes.keys.toSeq.flatMap(t ⇒ FILES(t).toSeq.map(f ⇒
Download(f, t))).map(d ⇒ () ⇒ download(d)): _*
+ downTypes.keys.toSeq.
+ flatMap(t ⇒ moduleFiles(t).toSeq.map(f ⇒ Download(f,
t))).map(d ⇒ () ⇒ download(d)): _*
)
}
diff --git
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolFactory.java
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolFactory.java
deleted file mode 100644
index 1879080..0000000
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolFactory.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.nlpcraft.common.pool;
-
-import java.util.concurrent.ExecutorService;
-
-public interface NCPoolFactory {
- ExecutorService mkExecutorService();
-}
diff --git
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala
index 5446bf5..838ca9f 100644
---
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala
+++
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala
@@ -21,9 +21,10 @@ import io.opencensus.trace.Span
import org.apache.nlpcraft.common.config.NCConfigurable
import org.apache.nlpcraft.common.module.NCModule
import org.apache.nlpcraft.common.module.NCModule._
-import org.apache.nlpcraft.common.{NCService, U}
+import org.apache.nlpcraft.common.{NCE, NCService, U}
-import java.util.concurrent.{ConcurrentHashMap, ExecutorService}
+import java.util
+import java.util.concurrent._
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
@@ -34,14 +35,15 @@ object NCPoolManager extends NCService {
@volatile private var data: ConcurrentHashMap[String, Holder] = new
ConcurrentHashMap
private case class Holder(context: ExecutionContext, pool:
Option[ExecutorService])
+ private case class PoolCfg(corePoolSize: Int, maximumPoolSize: Int,
keepAliveTime: Long)
private object Config extends NCConfigurable {
private val module: NCModule = NCModule.getModule
val moduleName: String = module.toString.toLowerCase
- val factories: Map[String, NCPoolFactory] = {
- val m: Option[Map[String, String]] =
+ val factories: Map[String, PoolCfg] = {
+ val m: Option[Map[String, util.HashMap[String, Number]]] =
getMapOpt(
module match {
case SERVER ⇒ "nlpcraft.server.pools"
@@ -50,7 +52,23 @@ object NCPoolManager extends NCService {
}
)
- m.getOrElse(Map.empty).map(p ⇒ p._1 → U.mkObject(p._2))
+ m.getOrElse(Map.empty).map { case (poolName, poolCfg) ⇒
+ def get(name: String): Number = {
+ val v = poolCfg.get(name)
+
+ if (v == null)
+ throw new NCE(s"Missed value '$name' for pool
'$poolName'")
+
+ v
+ }
+
+ poolName →
+ PoolCfg(
+ get("corePoolSize").intValue(),
+ get("maximumPoolSize").intValue(),
+ get("keepAliveTime").longValue()
+ )
+ }
}
}
@@ -59,17 +77,26 @@ object NCPoolManager extends NCService {
name,
(_: String) ⇒
Config.factories.get(name) match {
- case Some(f) ⇒
- val p = f.mkExecutorService()
+ case Some(poolCfg) ⇒
+ val p = new ThreadPoolExecutor(
+ poolCfg.corePoolSize,
+ poolCfg.maximumPoolSize,
+ poolCfg.keepAliveTime,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue[Runnable]
+ )
logger.info(
- s"Executor service created with factory
${f.getClass.getName} for $name, " +
- s"module: ${Config.moduleName}."
+ s"Executor service created `$name" +
+ s", corePoolSize=${poolCfg.corePoolSize}" +
+ s", maximumPoolSize=${poolCfg.maximumPoolSize}" +
+ s", keepAliveTime=${poolCfg.keepAliveTime}" +
+ s", module: ${Config.moduleName}"
)
Holder(ExecutionContext.fromExecutor(p), Some(p))
case None ⇒
- logger.info(s"System executor service used for $name,
module: ${Config.moduleName}.")
+ logger.info(s"System executor service used for
`$name`, module: `${Config.moduleName}.")
Holder(ExecutionContext.Implicits.global, None)
}