This is an automated email from the ASF dual-hosted git repository.
jackxu2011 pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 4dc4247d2 Performance optimization, remove the synchronization wait
operation of mark cache close #3717 (#3720)
4dc4247d2 is described below
commit 4dc4247d274efea5e628ac89bf333bb22036fa1d
Author: peacewong <[email protected]>
AuthorDate: Wed Nov 2 16:51:49 2022 +0800
Performance optimization, remove the synchronization wait operation of mark
cache close #3717 (#3720)
---
.../ecm/ComputationEngineConnManager.scala | 23 +--------
.../orchestrator/ecm/EngineConnManager.scala | 54 +++++++++++-----------
.../ecm/LoadBalanceLabelEngineConnManager.scala | 2 +-
3 files changed, 28 insertions(+), 51 deletions(-)
diff --git
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
index 2d2cf556b..250e20fa6 100644
---
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
+++
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
@@ -63,28 +63,7 @@ class ComputationEngineConnManager extends
AbstractEngineConnManager with Loggin
override def applyMark(markReq: MarkReq): Mark = {
if (null == markReq) return null
- val mark = MARK_CACHE_LOCKER.synchronized {
- val markCache = getMarkCache().asScala.keys
- val maybeMark = markCache.find(_.getMarkReq.equals(markReq))
- maybeMark.orNull
- }
- if (null == mark) {
- if (markReq.getLabels.containsKey(LabelKeyConstant.BIND_ENGINE_KEY)) {
- val bindEngineLabel =
MarkReq.getLabelBuilderFactory.createLabel[BindEngineLabel](
- LabelKeyConstant.BIND_ENGINE_KEY,
- markReq.getLabels.get(LabelKeyConstant.BIND_ENGINE_KEY)
- )
- if (!bindEngineLabel.getIsJobGroupHead) {
- val msg =
- s"Cannot find mark related to bindEngineLabel :
${bindEngineLabel.getStringValue}"
- logger.error(msg)
- throw new
ECMPluginErrorException(ECMPluginConf.ECM_MARK_CACHE_ERROR_CODE, msg)
- }
- }
- createMark(markReq)
- } else {
- mark
- }
+ createMark(markReq)
}
override def createMark(markReq: MarkReq): Mark = {
diff --git
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
index 128a915b1..de996a353 100644
---
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
+++
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
@@ -26,9 +26,12 @@ import org.apache.linkis.orchestrator.ecm.entity.{Mark,
MarkReq, Policy}
import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException
import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor
+import org.apache.commons.collections.CollectionUtils
+
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
/**
*/
@@ -97,9 +100,8 @@ abstract class AbstractEngineConnManager extends
EngineConnManager with Logging
private val engineConnExecutorCache = new util.HashMap[ServiceInstance,
EngineConnExecutor]()
- private val markCache = new util.HashMap[Mark, util.List[ServiceInstance]]()
-
- protected val MARK_CACHE_LOCKER = new Object()
+ private val markCache: util.Map[Mark, util.List[ServiceInstance]] =
+ new util.concurrent.ConcurrentHashMap[Mark, util.List[ServiceInstance]]()
override def setEngineConnApplyAttempts(attemptNumber: Int): Unit =
this.attemptNumber =
attemptNumber
@@ -148,7 +150,7 @@ abstract class AbstractEngineConnManager extends
EngineConnManager with Logging
val executors = Utils.tryAndWarn {
instances.asScala.map(getEngineConnExecutorCache().get(_)).filter(null
!= _).sortBy {
executor =>
- if (null == executor.getRunningTaskCount) {
+ if (executor.getRunningTaskCount < 0) {
0
} else {
executor.getRunningTaskCount
@@ -162,7 +164,6 @@ abstract class AbstractEngineConnManager extends
EngineConnManager with Logging
return Some(executors.headOption.get)
}
for (executor <- executors) {
- // todo check
if (executor.useEngineConn) {
logger.info(
s"mark ${mark.getMarkId()} Finished to
getAvailableEngineConnExecutor by reuse"
@@ -189,31 +190,28 @@ abstract class AbstractEngineConnManager extends
EngineConnManager with Logging
}
protected def addMark(mark: Mark, instances: util.List[ServiceInstance]):
Unit =
- MARK_CACHE_LOCKER.synchronized {
- if (null != mark && !getMarkCache().containsKey(mark)) {
- logger.info(s"add mark ${mark.getMarkId()}")
- getMarkCache().put(mark, instances)
- }
+ if (null != mark && !getMarkCache().containsKey(mark)) {
+ logger.info(s"add mark ${mark.getMarkId()}")
+ getMarkCache().put(mark, instances)
}
- protected def getInstances(mark: Mark): util.List[ServiceInstance] =
- MARK_CACHE_LOCKER.synchronized {
- if (null != mark && getMarkCache().containsKey(mark)) {
- getMarkCache().get(mark)
- } else {
- null
- }
- }
+ protected def getInstances(mark: Mark): util.List[ServiceInstance] = if
(null != mark) {
+ getMarkCache().get(mark)
+ } else {
+ null
+ }
- protected def getMarksByInstance(serviceInstance: ServiceInstance):
Array[Mark] =
- MARK_CACHE_LOCKER.synchronized {
- getMarkCache().asScala
- .filter { keyValue =>
- keyValue._2.asScala.exists(serviceInstance.equals(_))
- }
- .keys
- .toArray
+ protected def getMarksByInstance(serviceInstance: ServiceInstance):
Array[Mark] = {
+ val markAndInstance = getMarkCache().entrySet().iterator()
+ val buffer = new ArrayBuffer[Mark]()
+ while (markAndInstance.hasNext) {
+ val next = markAndInstance.next()
+ if (next.getValue != null && next.getValue.indexOf(serviceInstance) >=
0) {
+ buffer.append(next.getKey)
+ }
}
+ buffer.toArray
+ }
override def releaseEngineConnExecutor(
engineConnExecutor: EngineConnExecutor,
@@ -251,8 +249,8 @@ abstract class AbstractEngineConnManager extends
EngineConnManager with Logging
}
}
- protected def removeMark(mark: Mark): Unit = MARK_CACHE_LOCKER.synchronized {
- if (null != mark && getMarkCache().containsKey(mark)) {
+ protected def removeMark(mark: Mark): Unit = {
+ if (null != mark) {
getMarkCache().remove(mark)
}
}
diff --git
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
index c19692ed6..622c8813b 100644
---
a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
+++
b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
@@ -228,7 +228,7 @@ class LoadBalanceLabelEngineConnManager extends
ComputationEngineConnManager wit
}
}
- protected def getAllInstances(): Array[String] =
MARK_CACHE_LOCKER.synchronized {
+ protected def getAllInstances(): Array[String] = {
val instances = new ArrayBuffer[String]
getMarkCache()
.values()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]