This is an automated email from the ASF dual-hosted git repository. casion pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit c73dd7f276fca34c40ef64be7833c9004bc08433 Author: peacewong <[email protected]> AuthorDate: Wed Jul 26 22:44:31 2023 +0800 add startup params to once task job context --- .../common/utils/OnceExecutorContentUtils.scala | 10 ++++---- .../entrance/interceptor/OnceJobInterceptor.scala | 2 +- .../service/engine/DefaultEngineReuseService.java | 30 +++++++++++++--------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala index dd4b9bcff..2c426339b 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala @@ -58,14 +58,14 @@ object OnceExecutorContentUtils { def mapToContent(contentMap: util.Map[String, Object]): OnceExecutorContent = { val onceExecutorContent = new OnceExecutorContent - implicit def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match { + def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match { case map: util.Map[String, Object] => map case _ => null } - onceExecutorContent.setJobContent(TaskConstant.JOB_CONTENT) - onceExecutorContent.setRuntimeMap(TaskConstant.PARAMS_CONFIGURATION_RUNTIME) - onceExecutorContent.setSourceMap(TaskConstant.SOURCE) - onceExecutorContent.setVariableMap(TaskConstant.PARAMS_VARIABLE) + onceExecutorContent.setJobContent(getOrNull(TaskConstant.JOB_CONTENT)) + onceExecutorContent.setRuntimeMap(getOrNull(TaskConstant.PARAMS_CONFIGURATION_RUNTIME)) + onceExecutorContent.setSourceMap(getOrNull(TaskConstant.SOURCE)) + onceExecutorContent.setVariableMap(getOrNull(TaskConstant.PARAMS_VARIABLE)) onceExecutorContent } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala index 1291a8566..9b0578980 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala @@ -94,8 +94,8 @@ class OnceJobInterceptor extends EntranceInterceptor { s"/tmp/${task.getExecuteUser}/${task.getId}" protected def getJobContent(task: JobRequest): util.Map[String, AnyRef] = { - // TODO Wait for optimizing since the class `JobRequest` is waiting for optimizing . val jobContent = new util.HashMap[String, AnyRef] + jobContent.putAll(TaskUtils.getStartupMap(task.getParams)) jobContent.put(TaskConstant.CODE, task.getExecutionCode) task.getLabels.foreach { case label: CodeLanguageLabel => diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java index 221df4b93..5f372cae6 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java @@ -164,6 +164,11 @@ public class DefaultEngineReuseService extends AbstractEngineService implements instances.keySet().toArray(new ScoreServiceInstance[0]); EngineNode[] engineScoreList = getEngineNodeManager().getEngineNodes(scoreServiceInstances); + if (null == engineScoreList || engineScoreList.length == 0) { + throw new LinkisRetryException( + AMConstant.ENGINE_ERROR_CODE, "No engine can be reused, cause from db is null"); + } + List<EngineNode> engines = Lists.newArrayList(); long timeout = engineReuseRequest.getTimeOut() <= 0 @@ -177,8 +182,10 @@ public class DefaultEngineReuseService extends AbstractEngineService implements long startTime = System.currentTimeMillis(); try { MutablePair<Integer, Integer> limitPair = MutablePair.of(1, reuseLimit); + List<EngineNode> canReuseEcList = new ArrayList<>(); + CollectionUtils.addAll(canReuseEcList, engineScoreList); LinkisUtils.waitUntil( - () -> selectEngineToReuse(limitPair, engines, engineScoreList), + () -> selectEngineToReuse(limitPair, engines, canReuseEcList), Duration.ofMillis(timeout)); } catch (TimeoutException e) { throw new LinkisRetryException( @@ -220,19 +227,22 @@ public class DefaultEngineReuseService extends AbstractEngineService implements public boolean selectEngineToReuse( MutablePair<Integer, Integer> count2reuseLimit, List<EngineNode> engines, - EngineNode[] engineScoreList) { + List<EngineNode> canReuseEcList) { if (count2reuseLimit.getLeft() > count2reuseLimit.getRight()) { throw new LinkisRetryException( AMConstant.ENGINE_ERROR_CODE, - "Engine reuse exceeds limit: " + count2reuseLimit.getRight()); + "Engine reuse exceeds limit: " + count2reuseLimit.getLeft()); } - count2reuseLimit.setLeft(count2reuseLimit.getLeft() + 1); - Optional<Node> choseNode = nodeSelector.choseNode(engineScoreList); + + Optional<Node> choseNode = nodeSelector.choseNode(canReuseEcList.toArray(new Node[0])); if (!choseNode.isPresent()) { throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused"); } EngineNode engineNode = (EngineNode) choseNode.get(); - logger.info("prepare to reuse engineNode: " + engineNode.getServiceInstance()); + logger.info( + "prepare to reuse engineNode: {} times {}", + engineNode.getServiceInstance(), + count2reuseLimit.getLeft()); EngineNode reuseEngine = LinkisUtils.tryCatch( @@ -254,12 +264,8 @@ public class DefaultEngineReuseService extends AbstractEngineService implements } if (CollectionUtils.isEmpty(engines)) { - Integer count = count2reuseLimit.getKey() + 1; - count2reuseLimit.setLeft(count); - engineScoreList = - Arrays.stream(engineScoreList) - .filter(node -> !node.equals(choseNode.get())) - .toArray(EngineNode[]::new); + count2reuseLimit.setLeft(count2reuseLimit.getLeft() + 1); + canReuseEcList.remove(choseNode.get()); } return CollectionUtils.isNotEmpty(engines); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
