This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 4bec6ad93 Modify the metrics field, replace instance with ticketId
(#4148)
4bec6ad93 is described below
commit 4bec6ad93db4de2e0eded8b25511a144c32862a8
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Mon Feb 13 00:12:10 2023 +0800
Modify the metrics field, replace instance with ticketId (#4148)
* replace instance with ticketId
---
.../apache/linkis/protocol/constants/TaskConstant.java | 3 +++
.../executor/service/TaskExecutionServiceImpl.scala | 10 ++++++++++
.../org/apache/linkis/entrance/EntranceServer.scala | 1 +
.../entrance/execute/DefaultEntranceExecutor.scala | 16 +++++++++++-----
.../apache/linkis/entrance/utils/JobHistoryHelper.scala | 10 +++++-----
.../computation/physical/CodeLogicalUnitExecTask.scala | 13 +++++++++++++
.../orchestrator/ecm/service/EngineConnExecutor.scala | 2 ++
.../ecm/service/impl/ComputationEngineConnExecutor.scala | 2 ++
.../linkis/jobhistory/conversions/TaskConversions.scala | 10 +++++++++-
9 files changed, 56 insertions(+), 11 deletions(-)
diff --git
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
index 2844fce46..8f5a68008 100644
---
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
+++
b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
@@ -63,6 +63,9 @@ public interface TaskConstant {
String ENTRANCEJOB_ENGINECONN_MAP = "engineconnMap";
String ENGINE_INSTANCE = "engineInstance";
+ String TICKET_ID = "ticketId";
+ String ENGINE_CONN_TASK_ID = "engineConnTaskId";
+ String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime";
String PARAMS_DATA_SOURCE = "dataSources";
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 608c2da2a..039c1060c 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -49,6 +49,7 @@ import
org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import
org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.engineconn.executor.listener.event.EngineConnSyncEvent
+import org.apache.linkis.engineconn.launch.EngineConnServer
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.exception.engineconn.{
EngineConnExecutorErrorCode,
@@ -436,6 +437,15 @@ class TaskExecutionServiceImpl
}
val extraInfoMap = new util.HashMap[String, Object]()
extraInfoMap.put(TaskConstant.ENGINE_INSTANCE,
Sender.getThisInstance)
+ extraInfoMap.put(
+ TaskConstant.TICKET_ID,
+ EngineConnServer.getEngineCreationContext.getTicketId
+ )
+ extraInfoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, task.getTaskId)
+ extraInfoMap.put(
+ TaskConstant.ENGINE_CONN_SUBMIT_TIME,
+ System.currentTimeMillis.toString
+ )
// todo add other info
var respRunningInfo: ResponseTaskRunningInfo = null
if (null != resourceResponse) {
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 8b7d0d301..f298e5425 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -113,6 +113,7 @@ abstract class EntranceServer extends Logging {
t.setProgress(EntranceJob.JOB_COMPLETED_PROGRESS.toString)
val infoMap = new util.HashMap[String, AnyRef]
infoMap.put(TaskConstant.ENGINE_INSTANCE, "NULL")
+ infoMap.put(TaskConstant.TICKET_ID, "")
infoMap.put("message", "Task interception failed and cannot be
retried")
JobHistoryHelper.updateJobRequestMetrics(jobRequest, null, infoMap)
case _ =>
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index d2b6d0efc..7b5e3cc3c 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -82,14 +82,20 @@ class DefaultEntranceExecutor(id: Long)
orchestratorFuture.operate[ProgressProcessor](DefaultProgressOperation.PROGRESS_NAME)
progressProcessor.doOnObtain(progressInfoEvent => {
if (null != entranceJob) {
+ // Make sure to update the database, put it in front
+ try {
+ JobHistoryHelper.updateJobRequestMetrics(
+ entranceJob.getJobRequest,
+ progressInfoEvent.resourceMap,
+ progressInfoEvent.infoMap
+ )
+ } catch {
+ case e: Exception =>
+ logger.error("update job metrics error", e)
+ }
entranceJob.getProgressListener.foreach(
_.onProgressUpdate(entranceJob, progressInfoEvent.progress,
entranceJob.getProgressInfo)
)
- JobHistoryHelper.updateJobRequestMetrics(
- entranceJob.getJobRequest,
- progressInfoEvent.resourceMap,
- progressInfoEvent.infoMap
- )
}
})
progressProcessor
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index fa13d683b..0fc7e6e48 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -190,15 +190,15 @@ object JobHistoryHelper extends Logging {
metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP,
engineInstanceMap)
}
val infoMap = ecInfo
- if (null != infoMap && infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
- val instance =
infoMap.get(TaskConstant.ENGINE_INSTANCE).asInstanceOf[String]
+ if (null != infoMap && infoMap.containsKey(TaskConstant.TICKET_ID)) {
+ val ticketId = infoMap.get(TaskConstant.TICKET_ID).asInstanceOf[String]
val engineExtraInfoMap = engineInstanceMap
- .getOrDefault(instance, new util.HashMap[String, AnyRef])
+ .getOrDefault(ticketId, new util.HashMap[String, AnyRef])
.asInstanceOf[util.HashMap[String, AnyRef]]
engineExtraInfoMap.putAll(infoMap)
- engineInstanceMap.put(instance, engineExtraInfoMap)
+ engineInstanceMap.put(ticketId, engineExtraInfoMap)
} else {
- logger.warn("Ec info map must contains ECInstance")
+ logger.warn("Ec info map must contains ticketID")
}
}
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index 86c1cad86..bd665cd34 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -29,6 +29,7 @@ import org.apache.linkis.orchestrator.computation.execute.{
CodeExecTaskExecutorManager
}
import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
+import
org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor
import org.apache.linkis.orchestrator.exception.{
OrchestratorErrorCodeSummary,
OrchestratorErrorException,
@@ -120,6 +121,18 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask],
children: Array[ExecTask
TaskConstant.ENGINE_INSTANCE,
codeExecutor.getEngineConnExecutor.getServiceInstance.getInstance
)
+ infoMap.put(
+ TaskConstant.TICKET_ID,
+ // Ensure that the job metric has at least one EC record.
+ // When the EC is reuse, the same EC may have two records, One key
is Instance, and the other key is ticketId
+ if (codeExecutor.getEngineConnExecutor.isReuse()) {
+ codeExecutor.getEngineConnExecutor.getServiceInstance.getInstance
+ } else {
+ codeExecutor.getEngineConnExecutor.getTicketId
+ }
+ )
+ infoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, engineConnExecId)
+ infoMap.put(TaskConstant.ENGINE_CONN_SUBMIT_TIME,
System.currentTimeMillis.toString)
val event = TaskRunningInfoEvent(
this,
0f,
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
index c0d225065..2052560bb 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
@@ -74,6 +74,8 @@ trait EngineConnExecutor extends Closeable {
def setReuse(reuse: Boolean): EngineConnExecutor
+ def getTicketId: String
+
override def equals(other: Any): Boolean = other match {
case that: EngineConnExecutor =>
(that canEqual this) &&
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
index 2322be2d6..cb7998e5c 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/impl/ComputationEngineConnExecutor.scala
@@ -46,6 +46,8 @@ class ComputationEngineConnExecutor(engineNode: EngineNode)
extends AbstractEngi
private def getEngineConnSender: Sender =
Sender.getSender(getServiceInstance)
+ override def getTicketId: String = engineNode.getTicketId
+
override def close(): Unit = {
logger.info("Start to release engineConn {}", getServiceInstance)
val requestManagerUnlock =
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
index 929ead32b..3ebd8ff41 100644
---
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
+++
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
@@ -292,7 +292,15 @@ object TaskConversions extends Logging {
.get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
.asInstanceOf[util.Map[String, Object]]
if (null != engineMap && !engineMap.isEmpty) {
- taskVO.setEngineInstance(engineMap.map(_._1).toList.mkString(","))
+ // the engineInstance in metrics may be repeat, so it needs to be
distinct
+ val engineInstances =
+ engineMap.asScala
+ .map(_._2.asInstanceOf[util.Map[String, Object]])
+ .map(_.get(TaskConstant.ENGINE_INSTANCE))
+ .toList
+ .distinct
+ .mkString(",")
+ taskVO.setEngineInstance(engineInstances)
}
} else if (TaskStatus.Failed.toString.equals(job.getStatus)) {
taskVO.setCanRetry(true)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]