This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new de122dc26 linkis-computation-orchestrator - add log of engine reusing 
(#3852)
de122dc26 is described below

commit de122dc26767faed78d3cc80264a70cb8df470d3
Author: 成彬彬 <[email protected]>
AuthorDate: Tue Nov 22 11:53:07 2022 +0800

    linkis-computation-orchestrator - add log of engine reusing (#3852)
    
    * linkis-computation-orchestrator - add log of engine reusing
---
 .../computation/physical/CodeLogicalUnitExecTask.scala      |  6 ++++++
 .../orchestrator/ecm/ComputationEngineConnManager.scala     | 13 +++++++------
 .../orchestrator/ecm/service/EngineConnExecutor.scala       | 13 +++++++++++++
 3 files changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index a741b3643..e7f98c5c6 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -97,6 +97,12 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], 
children: Array[ExecTask
     if (executor.isDefined && !isCanceled) {
       val requestTask = toRequestTask
       val codeExecutor = executor.get
+      val msg = if (codeExecutor.getEngineConnExecutor.isReuse()) {
+        s"Succeed to reuse ec : 
${codeExecutor.getEngineConnExecutor.getServiceInstance}"
+      } else {
+        s"Succeed to create new ec : 
${codeExecutor.getEngineConnExecutor.getServiceInstance}"
+      }
+      getPhysicalContext.pushLog(TaskLogEvent(this, 
LogUtils.generateInfo(msg)))
       val response = 
Utils.tryCatch(codeExecutor.getEngineConnExecutor.execute(requestTask)) {
         t: Throwable =>
           logger.error(
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
index 29b4c14a8..2c97e36ce 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
@@ -86,7 +86,7 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
       count = count - 1
       val start = System.currentTimeMillis()
       try {
-        val engineNode = getEngineNodeAskManager(engineAskRequest, mark)
+        val (engineNode, reuse) = getEngineNodeAskManager(engineAskRequest, 
mark)
         if (null != engineNode) {
           val engineConnExecutor =
             if (
@@ -100,6 +100,7 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
           if (null != engineNode.getLabels) {
             
engineConnExecutor.setLabels(engineNode.getLabels.asScala.toList.toArray)
           }
+          engineConnExecutor.setReuse(reuse)
           return engineConnExecutor
         }
       } catch {
@@ -128,7 +129,7 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
   private def getEngineNodeAskManager(
       engineAskRequest: EngineAskRequest,
       mark: Mark
-  ): EngineNode = {
+  ): (EngineNode, Boolean) = {
     val response = Utils.tryCatch(getManagerSender().ask(engineAskRequest)) { 
t: Throwable =>
       val baseMsg = s"mark ${mark.getMarkId()}  failed to ask linkis Manager 
Can be retried "
       ExceptionUtils.getRootCause(t) match {
@@ -144,8 +145,8 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
     }
     response match {
       case engineNode: EngineNode =>
-        logger.debug("Succeed to get engineNode {} mark {}", engineNode: Any, 
mark.getMarkId(): Any)
-        engineNode
+        logger.debug(s"Succeed to reuse engineNode $engineNode mark 
${mark.getMarkId()}")
+        (engineNode, true)
       case EngineAskAsyncResponse(id, serviceInstance) =>
         logger.info(
           "{} received EngineAskAsyncResponse id: {} serviceInstance: {}",
@@ -160,7 +161,7 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
               "{} async id: {} success to async get EngineNode {}",
               Array(mark.getMarkId(), id, engineNode): _*
             )
-            engineNode
+            (engineNode, false)
           case EngineCreateError(id, exception, retry) =>
             logger.debug(
               "{} async id: {} Failed  to async get EngineNode, {}",
@@ -184,7 +185,7 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
           mark.getMarkId(): Any,
           engineAskRequest: Any
         )
-        null
+        (null, false)
     }
   }
 
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
index 10395dcf7..c0d225065 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
@@ -70,6 +70,10 @@ trait EngineConnExecutor extends Closeable {
 
   def updateLastUpdateTime(): Unit
 
+  def isReuse(): Boolean
+
+  def setReuse(reuse: Boolean): EngineConnExecutor
+
   override def equals(other: Any): Boolean = other match {
     case that: EngineConnExecutor =>
       (that canEqual this) &&
@@ -99,6 +103,8 @@ abstract class AbstractEngineConnExecutor extends 
EngineConnExecutor with Loggin
   private val runningTask: util.Map[String, RequestTask] =
     new ConcurrentHashMap[String, RequestTask]()
 
+  private var reuse: Boolean = false
+
   override def getLastUpdateTime(): Long = lastUpdateTime
 
   override def updateLastUpdateTime(): Unit = lastUpdateTime = 
System.currentTimeMillis()
@@ -123,4 +129,11 @@ abstract class AbstractEngineConnExecutor extends 
EngineConnExecutor with Loggin
     }
   }
 
+  override def isReuse(): Boolean = reuse
+
+  override def setReuse(reuse: Boolean): EngineConnExecutor = {
+    this.reuse = reuse
+    this
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to