This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch dev-1.3.2 in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit b365555cb12a11f5c0a15b61918fa2e734ac6a8e Author: Jack Xu <[email protected]> AuthorDate: Tue Nov 22 18:34:48 2022 +0800 refactor: move the buildEngineConnBuildRequest to linkisManage and add engineplugin route --- .../apache/linkis/rpc/conf/RPCConfiguration.scala | 7 ++++++ .../server/service/EngineConnLaunchService.scala | 7 ++---- .../impl/LinuxProcessEngineConnLaunchService.scala | 25 +++++----------------- .../manager/am/manager/DefaultEMNodeManager.scala | 6 +++--- .../linkis/manager/am/manager/EMNodeManager.scala | 9 +++++--- .../manager/am/pointer/DefaultEMNodPointer.scala | 6 +++--- .../am/service/em/DefaultEMEngineService.scala | 9 +++++++- .../service/common/pointer/EMNodPointer.scala | 4 ++-- ...SparkSubmitProcessEngineConnLaunchBuilder.scala | 3 ++- .../linkis/gateway/parser/GatewayParser.scala | 2 ++ 10 files changed, 40 insertions(+), 38 deletions(-) diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala index a68999254..5f6a06569 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala @@ -73,6 +73,13 @@ object RPCConfiguration { "metadatamanager,metadataquery" ).getValue.split(",") + val LINKIS_MANAGER_SERVICE_NAME: CommonVars[String] = + CommonVars("wds.linkis.gateway.conf.linkismanager.name", "linkis-cg-linkismanager") + + val LINKIS_MANAGER_SERVICE_LIST: Array[String] = + CommonVars("wds.linkis.gateway.conf.linkismanager.list", "linkisManager,engineplugin").getValue + .split(",") + val BDP_RPC_INSTANCE_ALIAS_SERVICE_REFRESH_INTERVAL: CommonVars[TimeType] = CommonVars("wds.linkis.rpc.instancealias.refresh.interval", new TimeType("3s")) diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/EngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/EngineConnLaunchService.scala index 92520f7a9..a3051d08b 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/EngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/EngineConnLaunchService.scala @@ -18,10 +18,7 @@ package org.apache.linkis.ecm.server.service import org.apache.linkis.manager.common.entity.node.EngineNode -import org.apache.linkis.manager.engineplugin.common.launch.entity.{ - EngineConnBuildRequest, - EngineConnLaunchRequest -} +import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest trait EngineConnLaunchService { @@ -30,7 +27,7 @@ trait EngineConnLaunchService { * @param engineConnBuildRequest * 封装了引擎启动的参数 */ - def launchEngineConn(engineConnBuildRequest: EngineConnBuildRequest): EngineNode + def launchEngineConn(engineConnLaunchRequest: EngineConnLaunchRequest): EngineNode def launchEngineConn(engineConnLaunchRequest: EngineConnLaunchRequest, duration: Long): EngineNode diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/LinuxProcessEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/LinuxProcessEngineConnLaunchService.scala index ce24bc476..63c7a928e 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/LinuxProcessEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/LinuxProcessEngineConnLaunchService.scala @@ -25,31 +25,16 @@ import org.apache.linkis.ecm.core.launch.{ import org.apache.linkis.ecm.linux.launch.LinuxProcessEngineConnLaunch import org.apache.linkis.ecm.server.conf.ECMConfiguration._ import org.apache.linkis.manager.common.entity.node.EngineNode -import org.apache.linkis.manager.engineplugin.common.launch.entity.{ - EngineConnBuildRequest, - EngineConnLaunchRequest -} -import org.apache.linkis.rpc.Sender +import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest import org.apache.linkis.rpc.message.annotation.Receiver class LinuxProcessEngineConnLaunchService extends ProcessEngineConnLaunchService { @Receiver - def launchEngineConn( - engineConnBuildRequest: EngineConnBuildRequest, - sender: Sender - ): EngineNode = { - Sender.getSender(MANAGER_SERVICE_NAME).ask(engineConnBuildRequest) match { - case request: EngineConnLaunchRequest if ENGINECONN_CREATE_DURATION._1 != 0L => - launchEngineConn(request, ENGINECONN_CREATE_DURATION._1) - } - } - - override def launchEngineConn(engineConnBuildRequest: EngineConnBuildRequest): EngineNode = { - Sender.getSender(MANAGER_SERVICE_NAME).ask(engineConnBuildRequest) match { - case request: EngineConnLaunchRequest => - launchEngineConn(request, ENGINECONN_CREATE_DURATION._1) - } + override def launchEngineConn(engineConnLaunchRequest: EngineConnLaunchRequest): EngineNode = { + if (ENGINECONN_CREATE_DURATION._1 != 0L) { + launchEngineConn(engineConnLaunchRequest, ENGINECONN_CREATE_DURATION._1) + } else null } def createDiscoveryMsgGenerator: DiscoveryMsgGenerator = new EurekaDiscoveryMsgGenerator diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala index c3769bef2..95f9ebc75 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala @@ -23,7 +23,7 @@ import org.apache.linkis.manager.common.entity.node._ import org.apache.linkis.manager.common.entity.persistence.PersistenceNodeEntity import org.apache.linkis.manager.common.protocol.em.{ECMOperateRequest, ECMOperateResponse} import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest -import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest +import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest import org.apache.linkis.manager.exception.NodeInstanceDuplicateException import org.apache.linkis.manager.persistence.{NodeManagerPersistence, NodeMetricManagerPersistence} import org.apache.linkis.manager.rm.service.ResourceManager @@ -176,10 +176,10 @@ class DefaultEMNodeManager extends EMNodeManager with Logging { * @return */ override def createEngine( - engineBuildRequest: EngineConnBuildRequest, + engineConnLaunchRequest: EngineConnLaunchRequest, emNode: EMNode ): EngineNode = { - nodePointerBuilder.buildEMNodePointer(emNode).createEngine(engineBuildRequest) + nodePointerBuilder.buildEMNodePointer(emNode).createEngine(engineConnLaunchRequest) } override def stopEngine(engineStopRequest: EngineStopRequest, emNode: EMNode): Unit = { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/EMNodeManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/EMNodeManager.scala index 77fb994c3..404939a6f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/EMNodeManager.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/EMNodeManager.scala @@ -21,7 +21,7 @@ import org.apache.linkis.common.ServiceInstance import org.apache.linkis.manager.common.entity.node.{EMNode, EngineNode, Node, ScoreServiceInstance} import org.apache.linkis.manager.common.protocol.em.{ECMOperateRequest, ECMOperateResponse} import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest -import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest +import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest trait EMNodeManager { @@ -52,11 +52,14 @@ trait EMNodeManager { /** * 1. request engineManager to launch engine 2. persist engine info * - * @param engineBuildRequest + * @param engineConnLaunchRequest + * engine launch request * @param emNode + * ecm node * @return + * engine node */ - def createEngine(engineBuildRequest: EngineConnBuildRequest, emNode: EMNode): EngineNode + def createEngine(engineConnLaunchRequest: EngineConnLaunchRequest, emNode: EMNode): EngineNode def stopEngine(engineStopRequest: EngineStopRequest, emNode: EMNode): Unit diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.scala index 7c9339c88..5bd74edcd 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/pointer/DefaultEMNodPointer.scala @@ -24,7 +24,7 @@ import org.apache.linkis.manager.common.constant.AMConstant import org.apache.linkis.manager.common.entity.node.{EngineNode, Node} import org.apache.linkis.manager.common.protocol.em.{ECMOperateRequest, ECMOperateResponse} import org.apache.linkis.manager.common.protocol.engine.{EngineStopRequest, EngineStopResponse} -import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest +import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest import org.apache.linkis.manager.service.common.pointer.EMNodPointer class DefaultEMNodPointer(val node: Node) extends AbstractNodePointer with EMNodPointer { @@ -36,9 +36,9 @@ class DefaultEMNodPointer(val node: Node) extends AbstractNodePointer with EMNod */ override def getNode(): Node = node - override def createEngine(engineBuildRequest: EngineConnBuildRequest): EngineNode = { + override def createEngine(engineConnLaunchRequest: EngineConnLaunchRequest): EngineNode = { logger.info(s"Start to createEngine ask em ${getNode().getServiceInstance}") - getSender.ask(engineBuildRequest) match { + getSender.ask(engineConnLaunchRequest) match { case engineNode: EngineNode => logger.info( s"Succeed to createEngine ask em ${getNode().getServiceInstance}, engineNode $engineNode " diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala index fbeec1f5b..995b10b87 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala @@ -18,6 +18,7 @@ package org.apache.linkis.manager.am.service.em import org.apache.linkis.common.utils.Logging +import org.apache.linkis.engineplugin.server.service.EngineConnLaunchService import org.apache.linkis.manager.am.exception.AMErrorException import org.apache.linkis.manager.am.manager.{EMNodeManager, EngineNodeManager} import org.apache.linkis.manager.am.service.EMEngineService @@ -53,6 +54,9 @@ class DefaultEMEngineService extends EMEngineService with Logging { @Autowired private var nodeLabelService: NodeLabelService = _ + @Autowired + private var engineConnLaunchService: EngineConnLaunchService = _ + @Autowired private var labelFilter: LabelFilter = _ @@ -68,7 +72,10 @@ class DefaultEMEngineService extends EMEngineService with Logging { ): EngineNode = { logger.info(s"EM ${emNode.getServiceInstance} start to create Engine ${engineBuildRequest}") - val engineNode = emNodeManager.createEngine(engineBuildRequest, emNode) + val engineConnLaunchRequest = + engineConnLaunchService.createEngineConnLaunchRequest(engineBuildRequest) + + val engineNode = emNodeManager.createEngine(engineConnLaunchRequest, emNode) logger.info( s"EM ${emNode.getServiceInstance} Finished to create Engine ${engineBuildRequest.ticketId}" ) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/EMNodPointer.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/EMNodPointer.scala index e3d7c40db..803c191a0 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/EMNodPointer.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/EMNodPointer.scala @@ -20,11 +20,11 @@ package org.apache.linkis.manager.service.common.pointer import org.apache.linkis.manager.common.entity.node.EngineNode import org.apache.linkis.manager.common.protocol.em.{ECMOperateRequest, ECMOperateResponse} import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest -import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest +import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest trait EMNodPointer extends NodePointer { - def createEngine(engineBuildRequest: EngineConnBuildRequest): EngineNode + def createEngine(engineConnLaunchRequest: EngineConnLaunchRequest): EngineNode def stopEngine(engineStopRequest: EngineStopRequest): Unit diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index d38caaa6e..f9b53f8e1 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -350,13 +350,14 @@ class SparkSubmitProcessEngineConnLaunchBuilder private extends JavaProcessEngin this.deployMode("client") val driverJavaSet = new StringBuilder(" -server") - if (StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue)) + if (StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue)) { EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue .format(getGcLogDir(engineRequest)) .split("\\s+") .foreach(l => { driverJavaSet.append(" ").append(l) }) + } getLogDir(engineRequest).trim .split(" ") .foreach(l => { diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/parser/GatewayParser.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/parser/GatewayParser.scala index 91debac06..dd336809a 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/parser/GatewayParser.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/parser/GatewayParser.scala @@ -124,6 +124,8 @@ class DefaultGatewayParser(gatewayParsers: Array[GatewayParser]) extends Abstrac // In order to be compatible with metadata module name refactoring,this logic will be removed in subsequent versions } else if (RPCConfiguration.METADATAQUERY_SERVICE_LIST.contains(serviceId)) { RPCConfiguration.METADATAQUERY_SERVICE_APPLICATION_NAME.getValue + } else if (RPCConfiguration.LINKIS_MANAGER_SERVICE_LIST.contains(serviceId)) { + RPCConfiguration.LINKIS_MANAGER_SERVICE_NAME.getValue } else { serviceId } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
