This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new ecd6dab18 [Feature][udf] Load specific udfs as we needed (#3756)
ecd6dab18 is described below
commit ecd6dab1878e0ad702781fa936e333d0c946793d
Author: rarexixi <[email protected]>
AuthorDate: Sat Nov 5 18:06:05 2022 +0800
[Feature][udf] Load specific udfs as we needed (#3756)
* [Feature][udf] Load specific udfs as we needed
---
.../linkis/ecm/server/hook/JarUDFLoadECMHook.scala | 10 +++-
.../executor/hook/UDFLoadEngineConnHook.scala | 57 +++++++++++++---------
.../scala/org/apache/linkis/udf/UDFClient.scala | 31 ++++++++++--
.../apache/linkis/udf/api/rpc/RequestUdfIds.scala} | 29 ++---------
.../apache/linkis/udf/api/rpc/ResponseUdfs.scala} | 26 +---------
.../java/org/apache/linkis/udf/dao/UDFDao.java | 3 ++
.../java/org/apache/linkis/udf/dao/impl/UDFDao.xml | 14 ++++++
.../org/apache/linkis/udf/service/UDFService.java | 2 +
.../linkis/udf/service/impl/UDFServiceImpl.java | 8 +++
.../apache/linkis/udf/api/rpc/UdfReceiver.scala | 12 ++++-
.../linkis/udf/api/rpc/UdfReceiverChooser.scala | 7 ++-
11 files changed, 121 insertions(+), 78 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
index 1ac09b227..26cccfc9c 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
@@ -56,6 +56,12 @@ class JarUDFLoadECMHook extends ECMHook with Logging {
logger.info("start loading UDFs")
val user = pel.user
val ticketId = pel.ticketId
+
+ val udfAllLoad =
+ pel.creationDesc.properties.getOrDefault("linkis.user.udf.all.load",
"true").toBoolean
+ val udfIdStr =
pel.creationDesc.properties.getOrDefault("linkis.user.udf.custom.ids", "")
+ val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s
=> s.toLong)
+
val engineType = LabelUtil.getEngineType(pel.labels)
val workDir = localDirsHandleService.getEngineConnWorkDir(user,
ticketId, engineType)
val pubDir = localDirsHandleService.getEngineConnPublicDir
@@ -66,7 +72,9 @@ class JarUDFLoadECMHook extends ECMHook with Logging {
fs.setPermission(fsPath, "rwxrwxrwx")
}
- val udfInfos = UDFClient.getJarUdf(pel.user)
+ val udfInfos =
+ if (udfAllLoad) UDFClient.getJarUdf(pel.user)
+ else UDFClient.getJarUdfByIds(pel.user, udfIds)
val fileNameSet: mutable.HashSet[String] = new
mutable.HashSet[String]()
import util.control.Breaks._
udfInfos.foreach { udfInfo =>
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
index ca73e8823..91af2811a 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
@@ -43,7 +43,7 @@ import org.apache.commons.lang3.StringUtils
import java.io.File
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable.ArrayBuffer
abstract class UDFLoad extends Logging {
@@ -58,11 +58,11 @@ abstract class UDFLoad extends Logging {
protected def constructCode(udfInfo: UDFInfoVo): String
- protected def generateCode(user: String): Array[String] = {
+ protected def generateCode(): Array[String] = {
val codeBuffer = new ArrayBuffer[String]
val statementBuffer = new ArrayBuffer[String]
var accept = true
- getLoadUdfCode(user).split("\n").foreach {
+ getLoadUdfCode.split("\n").foreach {
case "" =>
case l if l.startsWith("%") =>
if (acceptCodeType(l)) {
@@ -83,12 +83,30 @@ abstract class UDFLoad extends Logging {
line.startsWith("%" + runType.toString)
}
- protected def getLoadUdfCode(user: String): String = {
- logger.info("start loading UDFs")
- val udfInfos = UDFClient.getUdfInfosByUdfType(user, category, udfType)
+ protected def getLoadUdfCode: String = {
+ val engineCreationContext =
+
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
+ val user = engineCreationContext.getUser
+ val udfAllLoad =
+
engineCreationContext.getOptions.getOrDefault("linkis.user.udf.all.load",
"true").toBoolean
+ val udfIdStr =
engineCreationContext.getOptions.getOrDefault("linkis.user.udf.custom.ids", "")
+ val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s =>
s.toLong)
+
+ logger.info(
+ s"start loading UDFs, user: $user, load all: $udfAllLoad, udfIds:
${udfIds.mkString("Array(", ", ", ")")}"
+ )
+
+ val udfInfos = if (udfAllLoad) {
+ UDFClient.getUdfInfosByUdfType(user, category, udfType)
+ } else {
+ UDFClient.getUdfInfosByUdfIds(user, udfIds, category, udfType)
+ }
logger.info("all udfs: ")
+
udfInfos.foreach { l =>
- logger.info("udfName:" + l.getUdfName + " bml_resource_id:" +
l.getBmlResourceId + "\n")
+ logger.info(
+ s"udfName:${l.getUdfName}, bml_resource_id:${l.getBmlResourceId},
bml_id:${l.getId}\n"
+ )
}
udfInfos
.filter { info => StringUtils.isNotEmpty(info.getBmlResourceId) }
@@ -126,18 +144,13 @@ abstract class UDFLoad extends Logging {
}
}
- private def getFunctionCode(): Array[String] = {
- val engineCreationContext =
-
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
- val user = engineCreationContext.getUser
- Utils.tryCatch(generateCode(user)) { case t: Throwable =>
- if (!ComputationExecutorConf.UDF_LOAD_FAILED_IGNORE.getValue) {
- logger.error("Failed to load function, executor close ")
- throw t
- } else {
- logger.error("Failed to load function", t)
- Array.empty[String]
- }
+ private def getFunctionCode: Array[String] = Utils.tryCatch(generateCode())
{ t: Throwable =>
+ if (!ComputationExecutorConf.UDF_LOAD_FAILED_IGNORE.getValue) {
+ logger.error("Failed to load function, executor close ")
+ throw t
+ } else {
+ logger.error("Failed to load function", t)
+ Array.empty[String]
}
}
@@ -148,7 +161,7 @@ abstract class UDFLoad extends Logging {
codes.foreach { code =>
logger.info("Submit function registration to engine, code: " + code)
Utils.tryCatch(executor.executeLine(new
EngineExecutionContext(executor), code)) {
- case t: Throwable =>
+ t: Throwable =>
if (!ComputationExecutorConf.UDF_LOAD_FAILED_IGNORE.getValue) {
Utils.tryQuietly(executor.close())
logger.error("Failed to load function, executor close ")
@@ -163,7 +176,7 @@ abstract class UDFLoad extends Logging {
protected def loadFunctions(executor: Executor): Unit = {
- val codes = getFunctionCode()
+ val codes = getFunctionCode
if (null != codes && codes.nonEmpty) {
executor match {
case computationExecutor: ComputationExecutor =>
@@ -176,7 +189,7 @@ abstract class UDFLoad extends Logging {
protected def loadUDF(labels: Array[Label[_]]): Unit = {
- val codes = getFunctionCode()
+ val codes = getFunctionCode
if (null != codes && codes.nonEmpty) {
val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
executor match {
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
b/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
index 4f9b69e79..da73139df 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
@@ -18,14 +18,14 @@
package org.apache.linkis.udf
import org.apache.linkis.rpc.Sender
-import org.apache.linkis.udf.api.rpc.{RequestUdfTree, ResponseUdfTree}
-import org.apache.linkis.udf.entity.{UDFInfo, UDFTree}
+import org.apache.linkis.udf.api.rpc.{RequestUdfIds, RequestUdfTree,
ResponseUdfs, ResponseUdfTree}
+import org.apache.linkis.udf.entity.UDFTree
import org.apache.linkis.udf.utils.ConstantVar
import org.apache.linkis.udf.vo.UDFInfoVo
import org.apache.commons.collections.CollectionUtils
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable.ArrayBuffer
object UDFClient {
@@ -51,10 +51,35 @@ object UDFClient {
udfInfoBuilder
}
+ def getUdfInfosByUdfIds(
+ userName: String,
+ udfIds: Array[Long],
+ category: String,
+ udfType: BigInt
+ ): ArrayBuffer[UDFInfoVo] = {
+ val udfInfoBuilder = new ArrayBuffer[UDFInfoVo]
+
+ val udfTree = Sender
+ .getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue)
+ .ask(RequestUdfIds(userName, udfIds, category))
+ .asInstanceOf[ResponseUdfs]
+
+ if (CollectionUtils.isNotEmpty(udfTree.udfInfos)) {
+ udfTree.udfInfos.asScala
+ .filter(infoVo => infoVo.getUdfType == udfType)
+ .foreach(infoVo => udfInfoBuilder.append(infoVo))
+ }
+ udfInfoBuilder
+ }
+
def getJarUdf(userName: String): ArrayBuffer[UDFInfoVo] = {
getUdfInfosByUdfType(userName, ConstantVar.UDF, ConstantVar.UDF_JAR)
}
+ def getJarUdfByIds(userName: String, udfIds: Array[Long]):
ArrayBuffer[UDFInfoVo] = {
+ getUdfInfosByUdfIds(userName, udfIds, ConstantVar.UDF, ConstantVar.UDF_JAR)
+ }
+
def getPyUdf(userName: String): ArrayBuffer[UDFInfoVo] = {
getUdfInfosByUdfType(userName, ConstantVar.UDF, ConstantVar.UDF_PY)
}
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestUdfIds.scala
similarity index 54%
copy from
linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
copy to
linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestUdfIds.scala
index 271589e9a..3f0891e39 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestUdfIds.scala
@@ -17,28 +17,9 @@
package org.apache.linkis.udf.api.rpc
-import org.apache.linkis.rpc.{Receiver, ReceiverChooser, RPCMessageEvent}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.protocol.{CacheableProtocol, RetryableProtocol}
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.Component
-
-import javax.annotation.PostConstruct
-
-@Component
-class UdfReceiverChooser extends ReceiverChooser {
-
- @Autowired
- private var udfTreeService: UDFTreeService = _
-
- private var udfReceiver: Option[UdfReceiver] = None
-
- @PostConstruct
- def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService))
-
- override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] =
event.message match {
- case _: UdfProtocol => udfReceiver
- case _ => None
- }
-
-}
+case class RequestUdfIds(userName: String, udfIds: Array[Long], treeCategory:
String)
+ extends RetryableProtocol
+ with CacheableProtocol
+ with UdfProtocol
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponseUdfs.scala
similarity index 54%
copy from
linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
copy to
linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponseUdfs.scala
index 271589e9a..054792207 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponseUdfs.scala
@@ -17,28 +17,6 @@
package org.apache.linkis.udf.api.rpc
-import org.apache.linkis.rpc.{Receiver, ReceiverChooser, RPCMessageEvent}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.udf.vo.UDFInfoVo
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.Component
-
-import javax.annotation.PostConstruct
-
-@Component
-class UdfReceiverChooser extends ReceiverChooser {
-
- @Autowired
- private var udfTreeService: UDFTreeService = _
-
- private var udfReceiver: Option[UdfReceiver] = None
-
- @PostConstruct
- def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService))
-
- override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] =
event.message match {
- case _: UdfProtocol => udfReceiver
- case _ => None
- }
-
-}
+class ResponseUdfs(val udfInfos: java.util.List[UDFInfoVo]) extends UdfProtocol
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
index cf86f8965..b94e42876 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
@@ -61,6 +61,9 @@ public interface UDFDao {
List<UDFInfoVo> getUDFInfoByTreeId(
Long treeId, String userName, Collection<Integer> categoryCodes);
+ List<UDFInfoVo> getUDFInfoByIds(
+ @Param("ids") Long[] ids, @Param("categoryCodes") Collection<Integer>
categoryCodes);
+
List<UDFInfo> getLoadedUDFs(String userName);
List<Long> getLoadedUDFIds(String userName);
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
index 6c11405f7..081a2d108 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
@@ -164,6 +164,20 @@
on a.id=b.udf_id
</select>
+ <select id="getUDFInfoByIds"
resultType="org.apache.linkis.udf.vo.UDFInfoVo">
+ select pub.*, puv.path, puv.register_format, puv.use_format,
puv.bml_resource_id, puv.bml_resource_version, puv.description
+ from
+ linkis_ps_udf_baseinfo pub
+ join (
+ select udf_id, max(bml_resource_version) as max_version
+ from linkis_ps_udf_version
+ where udf_id in <foreach collection="ids" open="("
separator="," close=")" item="item">#{item}</foreach>
+ group by udf_id
+ ) puvg on pub.id=puvg.udf_id
+ join linkis_ps_udf_version puv on puvg.udf_id=puv.udf_id and
puvg.max_version=puv.bml_resource_version
+ where udf_type in <foreach collection="categoryCodes" open="("
separator="," close=")" item="item">#{item}</foreach>
+ </select>
+
<select id="getLoadedUDFs" resultMap="UDFInfoMap">
select b.*,if(a.udf_id IS NULL,0,1) as is_load
from
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
index d210be038..325d71522 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
@@ -52,6 +52,8 @@ public interface UDFService {
List<UDFInfoVo> getUDFInfoByTreeId(Long treeId, String userName, String
category)
throws UDFException;
+ List<UDFInfoVo> getUDFInfoByIds(Long[] ids, String category) throws
UDFException;
+
Map<String, List<String>> generateInitSql(String userName) throws
UDFException;
Iterator<String> getAllLoadJars(String userName) throws UDFException;
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
index 0029b8308..f4ad874fb 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
@@ -806,6 +806,14 @@ public class UDFServiceImpl implements UDFService {
return udfDao.getUDFInfoByTreeId(treeId, userName,
categoryToCodes.get(category));
}
+ @Override
+ public List<UDFInfoVo> getUDFInfoByIds(Long[] ids, String category) {
+ if (ids == null || ids.length == 0) {
+ return new ArrayList<>(0);
+ }
+ return udfDao.getUDFInfoByIds(ids, categoryToCodes.get(category));
+ }
+
/**
* Generate sql needs content: divided into jar, python, scala Save Path and
registration syntax
* separately 生成sql需要内容: 分为jar,python,scala 分别保存Path和注册语法
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
index 8be45ee5d..501e4d3bc 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
@@ -18,7 +18,9 @@
package org.apache.linkis.udf.api.rpc
import org.apache.linkis.rpc.{Receiver, Sender}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.udf.service.{UDFService, UDFTreeService}
+
+import java.lang
import scala.concurrent.duration.Duration
@@ -26,9 +28,12 @@ class UdfReceiver extends Receiver {
private var udfTreeService: UDFTreeService = _
- def this(udfTreeService: UDFTreeService) = {
+ private var udfService: UDFService = _
+
+ def this(udfTreeService: UDFTreeService, udfService: UDFService) = {
this()
this.udfTreeService = udfTreeService
+ this.udfService = udfService
}
override def receive(message: Any, sender: Sender): Unit = {}
@@ -38,6 +43,9 @@ class UdfReceiver extends Receiver {
case RequestUdfTree(userName, treeType, treeId, treeCategory) =>
val udfTree = udfTreeService.getTreeById(treeId, userName, treeType,
treeCategory)
new ResponseUdfTree(udfTree)
+ case RequestUdfIds(userName, udfIds, treeCategory) =>
+ val udfs = udfService.getUDFInfoByIds(udfIds.map(id => new
lang.Long(id)), treeCategory)
+ new ResponseUdfs(udfs)
case _ =>
}
}
diff --git
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
index 271589e9a..de7558010 100644
---
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
+++
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
@@ -18,7 +18,7 @@
package org.apache.linkis.udf.api.rpc
import org.apache.linkis.rpc.{Receiver, ReceiverChooser, RPCMessageEvent}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.udf.service.{UDFService, UDFTreeService}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
@@ -31,10 +31,13 @@ class UdfReceiverChooser extends ReceiverChooser {
@Autowired
private var udfTreeService: UDFTreeService = _
+ @Autowired
+ private var udfService: UDFService = _
+
private var udfReceiver: Option[UdfReceiver] = None
@PostConstruct
- def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService))
+ def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService,
udfService))
override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] =
event.message match {
case _: UdfProtocol => udfReceiver
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]