This is an automated email from the ASF dual-hosted git repository.
aradzinski pushed a commit to branch NLPCRAFT-477
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git
The following commit(s) were added to refs/heads/NLPCRAFT-477 by this push:
new 9138305 Create NCConversationManager.scala
9138305 is described below
commit 91383058e3119f05554c76d7f0bc5909f37ec61d
Author: Aaron Radzinski <[email protected]>
AuthorDate: Wed Feb 2 09:31:23 2022 -0800
Create NCConversationManager.scala
---
.../conversation/NCConversationManager.scala | 111 +++++++++++++++++++++
1 file changed, 111 insertions(+)
diff --git
a/nlpcraft/src/main/scala/org/apache/nlpcraft/internal/conversation/NCConversationManager.scala
b/nlpcraft/src/main/scala/org/apache/nlpcraft/internal/conversation/NCConversationManager.scala
new file mode 100644
index 0000000..d916e37
--- /dev/null
+++
b/nlpcraft/src/main/scala/org/apache/nlpcraft/internal/conversation/NCConversationManager.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nlpcraft.internal.conversation
+
+import io.opencensus.trace.Span
+import org.apache.nlpcraft.common._
+import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
+
+import scala.collection._
+
+/**
+ * Conversation manager.
+ */
+object NCConversationManager:
+ case class Key(usrId: Long, mdlId: String)
+ case class Value(conv: NCConversation, var tstamp: Long = 0)
+
+ private final val convs: mutable.Map[Key, Value] =
mutable.HashMap.empty[Key, Value]
+
+ @volatile private var gc: Thread = _
+
+ /**
+ *
+ * @return
+ */
+ def start(): NCService =
+ gc = NCUtils.mkThread("conversation-manager-gc") { t =>
+ while (!t.isInterrupted)
+ try
+ convs.synchronized {
+ val sleepTime = clearForTimeout() - U.now()
+ if sleepTime > 0 then convs.wait(sleepTime)
+ }
+ catch
+ case _: InterruptedException => // No-op.
+ case e: Throwable => U.prettyError(logger, s"Unexpected
error for thread: ${t.getName}", e)
+ }
+ gc.start()
+
+ /**
+ *
+ */
+ def stop(): Unit =
+ NCUtils.stopThread(gc)
+ gc = null
+ convs.clear()
+
+ /**
+ * Gets next clearing time.
+ */
+ private def clearForTimeout(): Long =
+ require(Thread.holdsLock(convs))
+
+ val now = U.now()
+ val delKeys = mutable.HashSet.empty[Key]
+
+ for ((key, value) <- convs)
+ val del =
+ NCModelManager.getModelOpt(key.mdlId) match
+ case Some(mdl) => value.tstamp < now -
mdl.model.getConversationTimeout
+ case None => true
+
+ if del then
+ value.conv.getUserData.clear()
+ delKeys += key
+
+ convs --= delKeys
+
+ if convs.nonEmpty then convs.values.map(v => v.tstamp +
v.conv.timeoutMs).min
+ else Long.MaxValue
+
+ /**
+ * Gets conversation for given key.
+ *
+ * @param usrId User ID.
+ * @param mdlId Model ID.
+ * @return New or existing conversation.
+ */
+ def getConversation(usrId: Long, mdlId: String, parent: Span = null):
NCConversation =
+ startScopedSpan("getConversation", parent, "usrId" -> usrId, "mdlId"
-> mdlId) { _ =>
+ val mdl = NCModelManager.getModel(mdlId).model
+
+ convs.synchronized {
+ val v = convs.getOrElseUpdate(
+ Key(usrId, mdlId),
+ Value(NCConversation(usrId, mdlId,
mdl.getConversationTimeout, mdl.getConversationDepth))
+ )
+
+ v.tstamp = U.nowUtcMs()
+
+ convs.notifyAll()
+
+ v.conv
+ }
+ }
+}