This is an automated email from the ASF dual-hosted git repository.

jackxu2011 pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 4dc4247d2 Performance optimization, remove the synchronization wait 
operation of mark cache close #3717 (#3720)
4dc4247d2 is described below

commit 4dc4247d274efea5e628ac89bf333bb22036fa1d
Author: peacewong <[email protected]>
AuthorDate: Wed Nov 2 16:51:49 2022 +0800

    Performance optimization, remove the synchronization wait operation of mark 
cache close #3717 (#3720)
---
 .../ecm/ComputationEngineConnManager.scala         | 23 +--------
 .../orchestrator/ecm/EngineConnManager.scala       | 54 +++++++++++-----------
 .../ecm/LoadBalanceLabelEngineConnManager.scala    |  2 +-
 3 files changed, 28 insertions(+), 51 deletions(-)

diff --git 
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
 
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
index 2d2cf556b..250e20fa6 100644
--- 
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
+++ 
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
@@ -63,28 +63,7 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
 
   override def applyMark(markReq: MarkReq): Mark = {
     if (null == markReq) return null
-    val mark = MARK_CACHE_LOCKER.synchronized {
-      val markCache = getMarkCache().asScala.keys
-      val maybeMark = markCache.find(_.getMarkReq.equals(markReq))
-      maybeMark.orNull
-    }
-    if (null == mark) {
-      if (markReq.getLabels.containsKey(LabelKeyConstant.BIND_ENGINE_KEY)) {
-        val bindEngineLabel = 
MarkReq.getLabelBuilderFactory.createLabel[BindEngineLabel](
-          LabelKeyConstant.BIND_ENGINE_KEY,
-          markReq.getLabels.get(LabelKeyConstant.BIND_ENGINE_KEY)
-        )
-        if (!bindEngineLabel.getIsJobGroupHead) {
-          val msg =
-            s"Cannot find mark related to bindEngineLabel : 
${bindEngineLabel.getStringValue}"
-          logger.error(msg)
-          throw new 
ECMPluginErrorException(ECMPluginConf.ECM_MARK_CACHE_ERROR_CODE, msg)
-        }
-      }
-      createMark(markReq)
-    } else {
-      mark
-    }
+    createMark(markReq)
   }
 
   override def createMark(markReq: MarkReq): Mark = {
diff --git 
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
 
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
index 128a915b1..de996a353 100644
--- 
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
+++ 
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
@@ -26,9 +26,12 @@ import org.apache.linkis.orchestrator.ecm.entity.{Mark, 
MarkReq, Policy}
 import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException
 import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor
 
+import org.apache.commons.collections.CollectionUtils
+
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 /**
  */
@@ -97,9 +100,8 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
 
   private val engineConnExecutorCache = new util.HashMap[ServiceInstance, 
EngineConnExecutor]()
 
-  private val markCache = new util.HashMap[Mark, util.List[ServiceInstance]]()
-
-  protected val MARK_CACHE_LOCKER = new Object()
+  private val markCache: util.Map[Mark, util.List[ServiceInstance]] =
+    new util.concurrent.ConcurrentHashMap[Mark, util.List[ServiceInstance]]()
 
   override def setEngineConnApplyAttempts(attemptNumber: Int): Unit = 
this.attemptNumber =
     attemptNumber
@@ -148,7 +150,7 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
       val executors = Utils.tryAndWarn {
         instances.asScala.map(getEngineConnExecutorCache().get(_)).filter(null 
!= _).sortBy {
           executor =>
-            if (null == executor.getRunningTaskCount) {
+            if (executor.getRunningTaskCount < 0) {
               0
             } else {
               executor.getRunningTaskCount
@@ -162,7 +164,6 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
           return Some(executors.headOption.get)
         }
         for (executor <- executors) {
-          // todo check
           if (executor.useEngineConn) {
             logger.info(
               s"mark ${mark.getMarkId()} Finished to   
getAvailableEngineConnExecutor by reuse"
@@ -189,31 +190,28 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
   }
 
   protected def addMark(mark: Mark, instances: util.List[ServiceInstance]): 
Unit =
-    MARK_CACHE_LOCKER.synchronized {
-      if (null != mark && !getMarkCache().containsKey(mark)) {
-        logger.info(s"add mark ${mark.getMarkId()}")
-        getMarkCache().put(mark, instances)
-      }
+    if (null != mark && !getMarkCache().containsKey(mark)) {
+      logger.info(s"add mark ${mark.getMarkId()}")
+      getMarkCache().put(mark, instances)
     }
 
-  protected def getInstances(mark: Mark): util.List[ServiceInstance] =
-    MARK_CACHE_LOCKER.synchronized {
-      if (null != mark && getMarkCache().containsKey(mark)) {
-        getMarkCache().get(mark)
-      } else {
-        null
-      }
-    }
+  protected def getInstances(mark: Mark): util.List[ServiceInstance] = if 
(null != mark) {
+    getMarkCache().get(mark)
+  } else {
+    null
+  }
 
-  protected def getMarksByInstance(serviceInstance: ServiceInstance): 
Array[Mark] =
-    MARK_CACHE_LOCKER.synchronized {
-      getMarkCache().asScala
-        .filter { keyValue =>
-          keyValue._2.asScala.exists(serviceInstance.equals(_))
-        }
-        .keys
-        .toArray
+  protected def getMarksByInstance(serviceInstance: ServiceInstance): 
Array[Mark] = {
+    val markAndInstance = getMarkCache().entrySet().iterator()
+    val buffer = new ArrayBuffer[Mark]()
+    while (markAndInstance.hasNext) {
+      val next = markAndInstance.next()
+      if (next.getValue != null && next.getValue.indexOf(serviceInstance) >= 
0) {
+        buffer.append(next.getKey)
+      }
     }
+    buffer.toArray
+  }
 
   override def releaseEngineConnExecutor(
       engineConnExecutor: EngineConnExecutor,
@@ -251,8 +249,8 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
     }
   }
 
-  protected def removeMark(mark: Mark): Unit = MARK_CACHE_LOCKER.synchronized {
-    if (null != mark && getMarkCache().containsKey(mark)) {
+  protected def removeMark(mark: Mark): Unit = {
+    if (null != mark) {
       getMarkCache().remove(mark)
     }
   }
diff --git 
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
 
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
index c19692ed6..622c8813b 100644
--- 
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
+++ 
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
@@ -228,7 +228,7 @@ class LoadBalanceLabelEngineConnManager extends 
ComputationEngineConnManager wit
     }
   }
 
-  protected def getAllInstances(): Array[String] = 
MARK_CACHE_LOCKER.synchronized {
+  protected def getAllInstances(): Array[String] = {
     val instances = new ArrayBuffer[String]
     getMarkCache()
       .values()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to