This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new ecd6dab18 [Feature][udf] Load specific udfs as we needed (#3756)
ecd6dab18 is described below

commit ecd6dab1878e0ad702781fa936e333d0c946793d
Author: rarexixi <[email protected]>
AuthorDate: Sat Nov 5 18:06:05 2022 +0800

    [Feature][udf] Load specific udfs as we needed (#3756)
    
    * [Feature][udf] Load specific udfs as we needed
---
 .../linkis/ecm/server/hook/JarUDFLoadECMHook.scala | 10 +++-
 .../executor/hook/UDFLoadEngineConnHook.scala      | 57 +++++++++++++---------
 .../scala/org/apache/linkis/udf/UDFClient.scala    | 31 ++++++++++--
 .../apache/linkis/udf/api/rpc/RequestUdfIds.scala} | 29 ++---------
 .../apache/linkis/udf/api/rpc/ResponseUdfs.scala}  | 26 +---------
 .../java/org/apache/linkis/udf/dao/UDFDao.java     |  3 ++
 .../java/org/apache/linkis/udf/dao/impl/UDFDao.xml | 14 ++++++
 .../org/apache/linkis/udf/service/UDFService.java  |  2 +
 .../linkis/udf/service/impl/UDFServiceImpl.java    |  8 +++
 .../apache/linkis/udf/api/rpc/UdfReceiver.scala    | 12 ++++-
 .../linkis/udf/api/rpc/UdfReceiverChooser.scala    |  7 ++-
 11 files changed, 121 insertions(+), 78 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
index 1ac09b227..26cccfc9c 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
@@ -56,6 +56,12 @@ class JarUDFLoadECMHook extends ECMHook with Logging {
         logger.info("start loading UDFs")
         val user = pel.user
         val ticketId = pel.ticketId
+
+        val udfAllLoad =
+          pel.creationDesc.properties.getOrDefault("linkis.user.udf.all.load", 
"true").toBoolean
+        val udfIdStr = 
pel.creationDesc.properties.getOrDefault("linkis.user.udf.custom.ids", "")
+        val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s 
=> s.toLong)
+
         val engineType = LabelUtil.getEngineType(pel.labels)
         val workDir = localDirsHandleService.getEngineConnWorkDir(user, 
ticketId, engineType)
         val pubDir = localDirsHandleService.getEngineConnPublicDir
@@ -66,7 +72,9 @@ class JarUDFLoadECMHook extends ECMHook with Logging {
           fs.setPermission(fsPath, "rwxrwxrwx")
         }
 
-        val udfInfos = UDFClient.getJarUdf(pel.user)
+        val udfInfos =
+          if (udfAllLoad) UDFClient.getJarUdf(pel.user)
+          else UDFClient.getJarUdfByIds(pel.user, udfIds)
         val fileNameSet: mutable.HashSet[String] = new 
mutable.HashSet[String]()
         import util.control.Breaks._
         udfInfos.foreach { udfInfo =>
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
index ca73e8823..91af2811a 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/UDFLoadEngineConnHook.scala
@@ -43,7 +43,7 @@ import org.apache.commons.lang3.StringUtils
 
 import java.io.File
 
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.asScalaBufferConverter
 import scala.collection.mutable.ArrayBuffer
 
 abstract class UDFLoad extends Logging {
@@ -58,11 +58,11 @@ abstract class UDFLoad extends Logging {
 
   protected def constructCode(udfInfo: UDFInfoVo): String
 
-  protected def generateCode(user: String): Array[String] = {
+  protected def generateCode(): Array[String] = {
     val codeBuffer = new ArrayBuffer[String]
     val statementBuffer = new ArrayBuffer[String]
     var accept = true
-    getLoadUdfCode(user).split("\n").foreach {
+    getLoadUdfCode.split("\n").foreach {
       case "" =>
       case l if l.startsWith("%") =>
         if (acceptCodeType(l)) {
@@ -83,12 +83,30 @@ abstract class UDFLoad extends Logging {
     line.startsWith("%" + runType.toString)
   }
 
-  protected def getLoadUdfCode(user: String): String = {
-    logger.info("start loading UDFs")
-    val udfInfos = UDFClient.getUdfInfosByUdfType(user, category, udfType)
+  protected def getLoadUdfCode: String = {
+    val engineCreationContext =
+      
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
+    val user = engineCreationContext.getUser
+    val udfAllLoad =
+      
engineCreationContext.getOptions.getOrDefault("linkis.user.udf.all.load", 
"true").toBoolean
+    val udfIdStr = 
engineCreationContext.getOptions.getOrDefault("linkis.user.udf.custom.ids", "")
+    val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s => 
s.toLong)
+
+    logger.info(
+      s"start loading UDFs, user: $user, load all: $udfAllLoad, udfIds: 
${udfIds.mkString("Array(", ", ", ")")}"
+    )
+
+    val udfInfos = if (udfAllLoad) {
+      UDFClient.getUdfInfosByUdfType(user, category, udfType)
+    } else {
+      UDFClient.getUdfInfosByUdfIds(user, udfIds, category, udfType)
+    }
     logger.info("all udfs: ")
+
     udfInfos.foreach { l =>
-      logger.info("udfName:" + l.getUdfName + " bml_resource_id:" + 
l.getBmlResourceId + "\n")
+      logger.info(
+        s"udfName:${l.getUdfName}, bml_resource_id:${l.getBmlResourceId}, 
bml_id:${l.getId}\n"
+      )
     }
     udfInfos
       .filter { info => StringUtils.isNotEmpty(info.getBmlResourceId) }
@@ -126,18 +144,13 @@ abstract class UDFLoad extends Logging {
     }
   }
 
-  private def getFunctionCode(): Array[String] = {
-    val engineCreationContext =
-      
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
-    val user = engineCreationContext.getUser
-    Utils.tryCatch(generateCode(user)) { case t: Throwable =>
-      if (!ComputationExecutorConf.UDF_LOAD_FAILED_IGNORE.getValue) {
-        logger.error("Failed to load function, executor close ")
-        throw t
-      } else {
-        logger.error("Failed to load function", t)
-        Array.empty[String]
-      }
+  private def getFunctionCode: Array[String] = Utils.tryCatch(generateCode()) 
{ t: Throwable =>
+    if (!ComputationExecutorConf.UDF_LOAD_FAILED_IGNORE.getValue) {
+      logger.error("Failed to load function, executor close ")
+      throw t
+    } else {
+      logger.error("Failed to load function", t)
+      Array.empty[String]
     }
   }
 
@@ -148,7 +161,7 @@ abstract class UDFLoad extends Logging {
     codes.foreach { code =>
       logger.info("Submit function registration to engine, code: " + code)
       Utils.tryCatch(executor.executeLine(new 
EngineExecutionContext(executor), code)) {
-        case t: Throwable =>
+        t: Throwable =>
           if (!ComputationExecutorConf.UDF_LOAD_FAILED_IGNORE.getValue) {
             Utils.tryQuietly(executor.close())
             logger.error("Failed to load function, executor close ")
@@ -163,7 +176,7 @@ abstract class UDFLoad extends Logging {
 
   protected def loadFunctions(executor: Executor): Unit = {
 
-    val codes = getFunctionCode()
+    val codes = getFunctionCode
     if (null != codes && codes.nonEmpty) {
       executor match {
         case computationExecutor: ComputationExecutor =>
@@ -176,7 +189,7 @@ abstract class UDFLoad extends Logging {
 
   protected def loadUDF(labels: Array[Label[_]]): Unit = {
 
-    val codes = getFunctionCode()
+    val codes = getFunctionCode
     if (null != codes && codes.nonEmpty) {
       val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
       executor match {
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
index 4f9b69e79..da73139df 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-client/src/main/scala/org/apache/linkis/udf/UDFClient.scala
@@ -18,14 +18,14 @@
 package org.apache.linkis.udf
 
 import org.apache.linkis.rpc.Sender
-import org.apache.linkis.udf.api.rpc.{RequestUdfTree, ResponseUdfTree}
-import org.apache.linkis.udf.entity.{UDFInfo, UDFTree}
+import org.apache.linkis.udf.api.rpc.{RequestUdfIds, RequestUdfTree, 
ResponseUdfs, ResponseUdfTree}
+import org.apache.linkis.udf.entity.UDFTree
 import org.apache.linkis.udf.utils.ConstantVar
 import org.apache.linkis.udf.vo.UDFInfoVo
 
 import org.apache.commons.collections.CollectionUtils
 
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.asScalaBufferConverter
 import scala.collection.mutable.ArrayBuffer
 
 object UDFClient {
@@ -51,10 +51,35 @@ object UDFClient {
     udfInfoBuilder
   }
 
+  def getUdfInfosByUdfIds(
+      userName: String,
+      udfIds: Array[Long],
+      category: String,
+      udfType: BigInt
+  ): ArrayBuffer[UDFInfoVo] = {
+    val udfInfoBuilder = new ArrayBuffer[UDFInfoVo]
+
+    val udfTree = Sender
+      .getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue)
+      .ask(RequestUdfIds(userName, udfIds, category))
+      .asInstanceOf[ResponseUdfs]
+
+    if (CollectionUtils.isNotEmpty(udfTree.udfInfos)) {
+      udfTree.udfInfos.asScala
+        .filter(infoVo => infoVo.getUdfType == udfType)
+        .foreach(infoVo => udfInfoBuilder.append(infoVo))
+    }
+    udfInfoBuilder
+  }
+
   def getJarUdf(userName: String): ArrayBuffer[UDFInfoVo] = {
     getUdfInfosByUdfType(userName, ConstantVar.UDF, ConstantVar.UDF_JAR)
   }
 
+  def getJarUdfByIds(userName: String, udfIds: Array[Long]): 
ArrayBuffer[UDFInfoVo] = {
+    getUdfInfosByUdfIds(userName, udfIds, ConstantVar.UDF, ConstantVar.UDF_JAR)
+  }
+
   def getPyUdf(userName: String): ArrayBuffer[UDFInfoVo] = {
     getUdfInfosByUdfType(userName, ConstantVar.UDF, ConstantVar.UDF_PY)
   }
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestUdfIds.scala
similarity index 54%
copy from 
linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
copy to 
linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestUdfIds.scala
index 271589e9a..3f0891e39 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/RequestUdfIds.scala
@@ -17,28 +17,9 @@
 
 package org.apache.linkis.udf.api.rpc
 
-import org.apache.linkis.rpc.{Receiver, ReceiverChooser, RPCMessageEvent}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.protocol.{CacheableProtocol, RetryableProtocol}
 
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.Component
-
-import javax.annotation.PostConstruct
-
-@Component
-class UdfReceiverChooser extends ReceiverChooser {
-
-  @Autowired
-  private var udfTreeService: UDFTreeService = _
-
-  private var udfReceiver: Option[UdfReceiver] = None
-
-  @PostConstruct
-  def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService))
-
-  override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = 
event.message match {
-    case _: UdfProtocol => udfReceiver
-    case _ => None
-  }
-
-}
+case class RequestUdfIds(userName: String, udfIds: Array[Long], treeCategory: 
String)
+    extends RetryableProtocol
+    with CacheableProtocol
+    with UdfProtocol
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponseUdfs.scala
similarity index 54%
copy from 
linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
copy to 
linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponseUdfs.scala
index 271589e9a..054792207 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-common/src/main/scala/org/apache/linkis/udf/api/rpc/ResponseUdfs.scala
@@ -17,28 +17,6 @@
 
 package org.apache.linkis.udf.api.rpc
 
-import org.apache.linkis.rpc.{Receiver, ReceiverChooser, RPCMessageEvent}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.udf.vo.UDFInfoVo
 
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.Component
-
-import javax.annotation.PostConstruct
-
-@Component
-class UdfReceiverChooser extends ReceiverChooser {
-
-  @Autowired
-  private var udfTreeService: UDFTreeService = _
-
-  private var udfReceiver: Option[UdfReceiver] = None
-
-  @PostConstruct
-  def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService))
-
-  override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = 
event.message match {
-    case _: UdfProtocol => udfReceiver
-    case _ => None
-  }
-
-}
+class ResponseUdfs(val udfInfos: java.util.List[UDFInfoVo]) extends UdfProtocol
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
index cf86f8965..b94e42876 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/UDFDao.java
@@ -61,6 +61,9 @@ public interface UDFDao {
   List<UDFInfoVo> getUDFInfoByTreeId(
       Long treeId, String userName, Collection<Integer> categoryCodes);
 
+  List<UDFInfoVo> getUDFInfoByIds(
+      @Param("ids") Long[] ids, @Param("categoryCodes") Collection<Integer> 
categoryCodes);
+
   List<UDFInfo> getLoadedUDFs(String userName);
 
   List<Long> getLoadedUDFIds(String userName);
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
index 6c11405f7..081a2d108 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/dao/impl/UDFDao.xml
@@ -164,6 +164,20 @@
         on a.id=b.udf_id
     </select>
 
+    <select id="getUDFInfoByIds" 
resultType="org.apache.linkis.udf.vo.UDFInfoVo">
+        select pub.*, puv.path, puv.register_format, puv.use_format, 
puv.bml_resource_id, puv.bml_resource_version, puv.description
+        from
+            linkis_ps_udf_baseinfo pub
+            join (
+                select udf_id, max(bml_resource_version) as max_version
+                from linkis_ps_udf_version
+                where udf_id in <foreach collection="ids" open="(" 
separator="," close=")" item="item">#{item}</foreach>
+                group by udf_id
+            ) puvg on pub.id=puvg.udf_id
+            join linkis_ps_udf_version puv on puvg.udf_id=puv.udf_id and 
puvg.max_version=puv.bml_resource_version
+        where udf_type in <foreach collection="categoryCodes" open="(" 
separator="," close=")" item="item">#{item}</foreach>
+    </select>
+
     <select id="getLoadedUDFs" resultMap="UDFInfoMap">
         select b.*,if(a.udf_id IS NULL,0,1) as is_load
         from
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
index d210be038..325d71522 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/UDFService.java
@@ -52,6 +52,8 @@ public interface UDFService {
   List<UDFInfoVo> getUDFInfoByTreeId(Long treeId, String userName, String 
category)
       throws UDFException;
 
+  List<UDFInfoVo> getUDFInfoByIds(Long[] ids, String category) throws 
UDFException;
+
   Map<String, List<String>> generateInitSql(String userName) throws 
UDFException;
 
   Iterator<String> getAllLoadJars(String userName) throws UDFException;
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
index 0029b8308..f4ad874fb 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/java/org/apache/linkis/udf/service/impl/UDFServiceImpl.java
@@ -806,6 +806,14 @@ public class UDFServiceImpl implements UDFService {
     return udfDao.getUDFInfoByTreeId(treeId, userName, 
categoryToCodes.get(category));
   }
 
+  @Override
+  public List<UDFInfoVo> getUDFInfoByIds(Long[] ids, String category) {
+    if (ids == null || ids.length == 0) {
+      return new ArrayList<>(0);
+    }
+    return udfDao.getUDFInfoByIds(ids, categoryToCodes.get(category));
+  }
+
   /**
    * Generate sql needs content: divided into jar, python, scala Save Path and 
registration syntax
    * separately 生成sql需要内容: 分为jar,python,scala 分别保存Path和注册语法
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
index 8be45ee5d..501e4d3bc 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiver.scala
@@ -18,7 +18,9 @@
 package org.apache.linkis.udf.api.rpc
 
 import org.apache.linkis.rpc.{Receiver, Sender}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.udf.service.{UDFService, UDFTreeService}
+
+import java.lang
 
 import scala.concurrent.duration.Duration
 
@@ -26,9 +28,12 @@ class UdfReceiver extends Receiver {
 
   private var udfTreeService: UDFTreeService = _
 
-  def this(udfTreeService: UDFTreeService) = {
+  private var udfService: UDFService = _
+
+  def this(udfTreeService: UDFTreeService, udfService: UDFService) = {
     this()
     this.udfTreeService = udfTreeService
+    this.udfService = udfService
   }
 
   override def receive(message: Any, sender: Sender): Unit = {}
@@ -38,6 +43,9 @@ class UdfReceiver extends Receiver {
       case RequestUdfTree(userName, treeType, treeId, treeCategory) =>
         val udfTree = udfTreeService.getTreeById(treeId, userName, treeType, 
treeCategory)
         new ResponseUdfTree(udfTree)
+      case RequestUdfIds(userName, udfIds, treeCategory) =>
+        val udfs = udfService.getUDFInfoByIds(udfIds.map(id => new 
lang.Long(id)), treeCategory)
+        new ResponseUdfs(udfs)
       case _ =>
     }
   }
diff --git 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
index 271589e9a..de7558010 100644
--- 
a/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
+++ 
b/linkis-public-enhancements/linkis-udf/linkis-udf-service/src/main/scala/org/apache/linkis/udf/api/rpc/UdfReceiverChooser.scala
@@ -18,7 +18,7 @@
 package org.apache.linkis.udf.api.rpc
 
 import org.apache.linkis.rpc.{Receiver, ReceiverChooser, RPCMessageEvent}
-import org.apache.linkis.udf.service.UDFTreeService
+import org.apache.linkis.udf.service.{UDFService, UDFTreeService}
 
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.stereotype.Component
@@ -31,10 +31,13 @@ class UdfReceiverChooser extends ReceiverChooser {
   @Autowired
   private var udfTreeService: UDFTreeService = _
 
+  @Autowired
+  private var udfService: UDFService = _
+
   private var udfReceiver: Option[UdfReceiver] = None
 
   @PostConstruct
-  def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService))
+  def init(): Unit = udfReceiver = Some(new UdfReceiver(udfTreeService, 
udfService))
 
   override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = 
event.message match {
     case _: UdfProtocol => udfReceiver


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to