This is an automated email from the ASF dual-hosted git repository.

sergeykamov pushed a commit to branch NLPCRAFT-111
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git


The following commit(s) were added to refs/heads/NLPCRAFT-111 by this push:
     new 8672bab  WIP.
8672bab is described below

commit 8672bab8981f8894a3ed1c8203d764d6c1d48cf9
Author: Sergey Kamov <[email protected]>
AuthorDate: Thu Jan 21 23:09:44 2021 +0300

    WIP.
---
 nlpcraft/src/main/resources/nlpcraft.conf          | 18 ++++++++-
 .../common/extcfg/NCExternalConfigManager.scala    | 30 +++++++++++---
 .../apache/nlpcraft/common/pool/NCPoolFactory.java |  7 ----
 .../nlpcraft/common/pool/NCPoolManager.scala       | 47 +++++++++++++++++-----
 4 files changed, 78 insertions(+), 24 deletions(-)

diff --git a/nlpcraft/src/main/resources/nlpcraft.conf 
b/nlpcraft/src/main/resources/nlpcraft.conf
index b67ad76..b60ed14 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -208,7 +208,14 @@ nlpcraft {
         # 'ctxword' server endpoint URL.
         ctxword.url="http://localhost:5000";
 
-        # pools =
+        # TODO: some description
+        # pools = {
+        #     "org.apache.nlpcraft.server.rest.NCBasicRestApi" = {
+        #         "corePoolSize" = 2
+        #         "maximumPoolSize" = 4
+        #         "keepAliveTime" = 1000
+        #     }
+        # }
     }
 
     # Basic NLP toolkit to use on both server and probes. Possible values:
@@ -327,6 +334,13 @@ nlpcraft {
         # When exceeded the request will be automatically rejected.
         resultMaxSizeBytes = 1048576
 
-        # pools =
+        # TODO: some description
+        # pools = {
+        #     "org.apache.nlpcraft.common.extcfg.NCExternalConfigManager" = {
+        #         "corePoolSize" = 2
+        #         "maximumPoolSize" = 4
+        #         "keepAliveTime" = 1000
+        #     }
+        # }
     }
 }
diff --git 
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
 
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
index f34eb81..d56c80d 100644
--- 
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
+++ 
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/extcfg/NCExternalConfigManager.scala
@@ -21,22 +21,24 @@ import java.io._
 import java.net.URL
 import java.nio.file.Files
 import java.util.concurrent.ConcurrentHashMap
-
 import io.opencensus.trace.Span
 import org.apache.commons.codec.digest.DigestUtils
 import org.apache.commons.io.IOUtils
 import org.apache.nlpcraft.common.config.NCConfigurable
 import org.apache.nlpcraft.common.extcfg.NCExternalConfigType._
+import org.apache.nlpcraft.common.module.NCModule
+import org.apache.nlpcraft.common.module.NCModule.{PROBE, SERVER}
+import org.apache.nlpcraft.common.pool.NCPoolContext
 import org.apache.nlpcraft.common.{NCE, NCService, U}
 import resource.managed
+
 import scala.collection.JavaConverters._
 import scala.io.Source
-import scala.concurrent.ExecutionContext.Implicits.global
 
 /**
   * External configuration manager.
   */
-object NCExternalConfigManager extends NCService {
+object NCExternalConfigManager extends NCService with NCPoolContext {
     private final val DFLT_DIR = ".nlpcraft/extcfg"
     private final val MD5_FILE = "md5.txt"
 
@@ -65,6 +67,14 @@ object NCExternalConfigManager extends NCService {
             )
         )
 
+    private val FILES_BY_MODULES =
+        Map(
+            GEO → Seq(SERVER),
+            BADFILTER → Seq(SERVER, PROBE),
+            SPELL → Seq(SERVER),
+            OPENNLP → Seq(SERVER, PROBE)
+        )
+
     private object Config extends NCConfigurable {
         val url: String = getString("nlpcraft.extConfig.extUrl")
         val checkMd5: Boolean = getBool("nlpcraft.extConfig.checkMd5")
@@ -151,12 +161,21 @@ object NCExternalConfigManager extends NCService {
     override def start(parent: Span): NCService = startScopedSpan("start", 
parent) { _ ⇒
         ackStarting()
 
+        require(FILES.size == FILES_BY_MODULES.size)
         require(NCExternalConfigType.values.forall(FILES.contains))
 
+        val module = NCModule.getModule
+
+        val moduleFiles = FILES.filter(p ⇒ 
FILES_BY_MODULES(p._1).contains(module))
+
         val m = new ConcurrentHashMap[NCResourceType, File]
 
         U.executeParallel(
-            NCExternalConfigType.values.flatMap(t ⇒ FILES(t).map(FileHolder(_, 
t))).toSeq.map(f ⇒ () ⇒ processFile(f, m)): _*
+            moduleFiles.keys.
+                flatMap(t ⇒ moduleFiles(t).
+                map(FileHolder(_, t))).
+                toSeq.
+                map(f ⇒ () ⇒ processFile(f, m)): _*
         )
 
         val downTypes = m.asScala
@@ -166,7 +185,8 @@ object NCExternalConfigManager extends NCService {
                 downTypes.values.toSeq.map(d ⇒ () ⇒ clearDir(d)): _*
             )
             U.executeParallel(
-                downTypes.keys.toSeq.flatMap(t ⇒ FILES(t).toSeq.map(f ⇒ 
Download(f, t))).map(d ⇒ () ⇒ download(d)): _*
+                downTypes.keys.toSeq.
+                    flatMap(t ⇒ moduleFiles(t).toSeq.map(f ⇒ Download(f, 
t))).map(d ⇒ () ⇒ download(d)): _*
             )
         }
 
diff --git 
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolFactory.java 
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolFactory.java
deleted file mode 100644
index 1879080..0000000
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolFactory.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.nlpcraft.common.pool;
-
-import java.util.concurrent.ExecutorService;
-
-public interface NCPoolFactory {
-    ExecutorService mkExecutorService();
-}
diff --git 
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala 
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala
index 5446bf5..838ca9f 100644
--- 
a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala
+++ 
b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCPoolManager.scala
@@ -21,9 +21,10 @@ import io.opencensus.trace.Span
 import org.apache.nlpcraft.common.config.NCConfigurable
 import org.apache.nlpcraft.common.module.NCModule
 import org.apache.nlpcraft.common.module.NCModule._
-import org.apache.nlpcraft.common.{NCService, U}
+import org.apache.nlpcraft.common.{NCE, NCService, U}
 
-import java.util.concurrent.{ConcurrentHashMap, ExecutorService}
+import java.util
+import java.util.concurrent._
 import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext
 
@@ -34,14 +35,15 @@ object NCPoolManager extends NCService {
     @volatile private var data: ConcurrentHashMap[String, Holder] = new 
ConcurrentHashMap
 
     private case class Holder(context: ExecutionContext, pool: 
Option[ExecutorService])
+    private case class PoolCfg(corePoolSize: Int, maximumPoolSize: Int, 
keepAliveTime: Long)
 
     private object Config extends NCConfigurable {
         private val module: NCModule = NCModule.getModule
 
         val moduleName: String = module.toString.toLowerCase
 
-        val factories: Map[String, NCPoolFactory] = {
-            val m: Option[Map[String, String]] =
+        val factories: Map[String, PoolCfg] = {
+            val m: Option[Map[String, util.HashMap[String, Number]]] =
                 getMapOpt(
                     module match {
                         case SERVER ⇒ "nlpcraft.server.pools"
@@ -50,7 +52,23 @@ object NCPoolManager extends NCService {
                     }
                 )
 
-            m.getOrElse(Map.empty).map(p ⇒ p._1 → U.mkObject(p._2))
+            m.getOrElse(Map.empty).map { case (poolName, poolCfg) ⇒
+                def get(name: String): Number = {
+                    val v = poolCfg.get(name)
+
+                    if (v == null)
+                        throw new NCE(s"Missed value '$name' for pool 
'$poolName'")
+
+                    v
+                }
+
+                poolName →
+                    PoolCfg(
+                        get("corePoolSize").intValue(),
+                        get("maximumPoolSize").intValue(),
+                        get("keepAliveTime").longValue()
+                    )
+            }
         }
     }
 
@@ -59,17 +77,26 @@ object NCPoolManager extends NCService {
             name,
             (_: String) ⇒
                 Config.factories.get(name) match {
-                    case Some(f) ⇒
-                        val p = f.mkExecutorService()
+                    case Some(poolCfg) ⇒
+                        val p = new ThreadPoolExecutor(
+                            poolCfg.corePoolSize,
+                            poolCfg.maximumPoolSize,
+                            poolCfg.keepAliveTime,
+                            TimeUnit.MILLISECONDS,
+                            new LinkedBlockingQueue[Runnable]
+                        )
 
                         logger.info(
-                            s"Executor service created with factory 
${f.getClass.getName} for $name, " +
-                            s"module: ${Config.moduleName}."
+                            s"Executor service created `$name" +
+                            s", corePoolSize=${poolCfg.corePoolSize}" +
+                            s", maximumPoolSize=${poolCfg.maximumPoolSize}" +
+                            s", keepAliveTime=${poolCfg.keepAliveTime}" +
+                            s", module: ${Config.moduleName}"
                         )
 
                         Holder(ExecutionContext.fromExecutor(p), Some(p))
                     case None ⇒
-                        logger.info(s"System executor service used for $name, 
module: ${Config.moduleName}.")
+                        logger.info(s"System executor service used for 
`$name`, module: `${Config.moduleName}.")
 
                         Holder(ExecutionContext.Implicits.global, None)
                 }

Reply via email to