This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 4bec6ad93 Modify the metrics field, replace instance with ticketId 
(#4148)
4bec6ad93 is described below

commit 4bec6ad93db4de2e0eded8b25511a144c32862a8
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Mon Feb 13 00:12:10 2023 +0800

    Modify the metrics field, replace instance with ticketId (#4148)
    
    * replace instance with ticketId
---
 .../apache/linkis/protocol/constants/TaskConstant.java   |  3 +++
 .../executor/service/TaskExecutionServiceImpl.scala      | 10 ++++++++++
 .../org/apache/linkis/entrance/EntranceServer.scala      |  1 +
 .../entrance/execute/DefaultEntranceExecutor.scala       | 16 +++++++++++-----
 .../apache/linkis/entrance/utils/JobHistoryHelper.scala  | 10 +++++-----
 .../computation/physical/CodeLogicalUnitExecTask.scala   | 13 +++++++++++++
 .../orchestrator/ecm/service/EngineConnExecutor.scala    |  2 ++
 .../ecm/service/impl/ComputationEngineConnExecutor.scala |  2 ++
 .../linkis/jobhistory/conversions/TaskConversions.scala  | 10 +++++++++-
 9 files changed, 56 insertions(+), 11 deletions(-)

diff --git 
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
 
b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
index 2844fce46..8f5a68008 100644
--- 
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
+++ 
b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
@@ -63,6 +63,9 @@ public interface TaskConstant {
 
   String ENTRANCEJOB_ENGINECONN_MAP = "engineconnMap";
   String ENGINE_INSTANCE = "engineInstance";
+  String TICKET_ID = "ticketId";
+  String ENGINE_CONN_TASK_ID = "engineConnTaskId";
+  String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime";
 
   String PARAMS_DATA_SOURCE = "dataSources";
 
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 608c2da2a..039c1060c 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -49,6 +49,7 @@ import 
org.apache.linkis.engineconn.core.executor.ExecutorManager
 import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import 
org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
 import org.apache.linkis.engineconn.executor.listener.event.EngineConnSyncEvent
+import org.apache.linkis.engineconn.launch.EngineConnServer
 import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
 import org.apache.linkis.governance.common.exception.engineconn.{
   EngineConnExecutorErrorCode,
@@ -436,6 +437,15 @@ class TaskExecutionServiceImpl
               }
             val extraInfoMap = new util.HashMap[String, Object]()
             extraInfoMap.put(TaskConstant.ENGINE_INSTANCE, 
Sender.getThisInstance)
+            extraInfoMap.put(
+              TaskConstant.TICKET_ID,
+              EngineConnServer.getEngineCreationContext.getTicketId
+            )
+            extraInfoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, task.getTaskId)
+            extraInfoMap.put(
+              TaskConstant.ENGINE_CONN_SUBMIT_TIME,
+              System.currentTimeMillis.toString
+            )
             // todo add other info
             var respRunningInfo: ResponseTaskRunningInfo = null
             if (null != resourceResponse) {
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 8b7d0d301..f298e5425 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -113,6 +113,7 @@ abstract class EntranceServer extends Logging {
           t.setProgress(EntranceJob.JOB_COMPLETED_PROGRESS.toString)
           val infoMap = new util.HashMap[String, AnyRef]
           infoMap.put(TaskConstant.ENGINE_INSTANCE, "NULL")
+          infoMap.put(TaskConstant.TICKET_ID, "")
           infoMap.put("message", "Task interception failed and cannot be 
retried")
           JobHistoryHelper.updateJobRequestMetrics(jobRequest, null, infoMap)
         case _ =>
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index d2b6d0efc..7b5e3cc3c 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -82,14 +82,20 @@ class DefaultEntranceExecutor(id: Long)
       
orchestratorFuture.operate[ProgressProcessor](DefaultProgressOperation.PROGRESS_NAME)
     progressProcessor.doOnObtain(progressInfoEvent => {
       if (null != entranceJob) {
+        // Make sure to update the database, put it in front
+        try {
+          JobHistoryHelper.updateJobRequestMetrics(
+            entranceJob.getJobRequest,
+            progressInfoEvent.resourceMap,
+            progressInfoEvent.infoMap
+          )
+        } catch {
+          case e: Exception =>
+            logger.error("update job metrics error", e)
+        }
         entranceJob.getProgressListener.foreach(
           _.onProgressUpdate(entranceJob, progressInfoEvent.progress, 
entranceJob.getProgressInfo)
         )
-        JobHistoryHelper.updateJobRequestMetrics(
-          entranceJob.getJobRequest,
-          progressInfoEvent.resourceMap,
-          progressInfoEvent.infoMap
-        )
       }
     })
     progressProcessor
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index fa13d683b..0fc7e6e48 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -190,15 +190,15 @@ object JobHistoryHelper extends Logging {
       metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, 
engineInstanceMap)
     }
     val infoMap = ecInfo
-    if (null != infoMap && infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
-      val instance = 
infoMap.get(TaskConstant.ENGINE_INSTANCE).asInstanceOf[String]
+    if (null != infoMap && infoMap.containsKey(TaskConstant.TICKET_ID)) {
+      val ticketId = infoMap.get(TaskConstant.TICKET_ID).asInstanceOf[String]
       val engineExtraInfoMap = engineInstanceMap
-        .getOrDefault(instance, new util.HashMap[String, AnyRef])
+        .getOrDefault(ticketId, new util.HashMap[String, AnyRef])
         .asInstanceOf[util.HashMap[String, AnyRef]]
       engineExtraInfoMap.putAll(infoMap)
-      engineInstanceMap.put(instance, engineExtraInfoMap)
+      engineInstanceMap.put(ticketId, engineExtraInfoMap)
     } else {
-      logger.warn("Ec info map must contains ECInstance")
+      logger.warn("Ec info map must contains ticketID")
     }
   }
 
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index 86c1cad86..bd665cd34 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -29,6 +29,7 @@ import org.apache.linkis.orchestrator.computation.execute.{
   CodeExecTaskExecutorManager
 }
 import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
+import 
org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor
 import org.apache.linkis.orchestrator.exception.{
   OrchestratorErrorCodeSummary,
   OrchestratorErrorException,
@@ -120,6 +121,18 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], 
children: Array[ExecTask
             TaskConstant.ENGINE_INSTANCE,
             codeExecutor.getEngineConnExecutor.getServiceInstance.getInstance
           )
+          infoMap.put(
+            TaskConstant.TICKET_ID,
+            // Ensure that the job metric has at least one EC record.
+            // When the EC is reuse, the same EC may have two records, One key 
is Instance, and the other key is ticketId
+            if (codeExecutor.getEngineConnExecutor.isReuse()) {
+              codeExecutor.getEngineConnExecutor.getServiceInstance.getInstance
+            } else {
+              codeExecutor.getEngineConnExecutor.getTicketId
+            }
+          )
+          infoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, engineConnExecId)
+          infoMap.put(TaskConstant.ENGINE_CONN_SUBMIT_TIME, 
System.currentTimeMillis.toString)
           val event = TaskRunningInfoEvent(
             this,
             0f,
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
index c0d225065..2052560bb 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
@@ -74,6 +74,8 @@ trait EngineConnExecutor extends Closeable {
 
   def setReuse(reuse: Boolean): EngineConnExecutor
 
+  def getTicketId: String
+
   override def equals(other: Any): Boolean = other match {
     case that: EngineConnExecutor =>
       (that canEqual this) &&
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
index 2322be2d6..cb7998e5c 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
@@ -46,6 +46,8 @@ class ComputationEngineConnExecutor(engineNode: EngineNode) 
extends AbstractEngi
 
   private def getEngineConnSender: Sender = 
Sender.getSender(getServiceInstance)
 
+  override def getTicketId: String = engineNode.getTicketId
+
   override def close(): Unit = {
     logger.info("Start to release engineConn {}", getServiceInstance)
     val requestManagerUnlock =
diff --git 
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
 
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
index 929ead32b..3ebd8ff41 100644
--- 
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
+++ 
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
@@ -292,7 +292,15 @@ object TaskConversions extends Logging {
         .get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
         .asInstanceOf[util.Map[String, Object]]
       if (null != engineMap && !engineMap.isEmpty) {
-        taskVO.setEngineInstance(engineMap.map(_._1).toList.mkString(","))
+        // the engineInstance in metrics may be repeat, so it needs to be 
distinct
+        val engineInstances =
+          engineMap.asScala
+            .map(_._2.asInstanceOf[util.Map[String, Object]])
+            .map(_.get(TaskConstant.ENGINE_INSTANCE))
+            .toList
+            .distinct
+            .mkString(",")
+        taskVO.setEngineInstance(engineInstances)
       }
     } else if (TaskStatus.Failed.toString.equals(job.getStatus)) {
       taskVO.setCanRetry(true)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to