This is an automated email from the ASF dual-hosted git repository.

kinghao pushed a commit to branch 1.9.0-fixhivecurrenc
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 55c9df0c401223b9614cc04337e28ad93c17fdd7
Author: kinghao <[email protected]>
AuthorDate: Tue Nov 18 21:03:34 2025 +0800

    fix hive concurrent mode load udf error problems
---
 .../common/conf/EnvConfiguration.scala             |  2 --
 .../entrance/interceptor/impl/SQLExplainTest.java  | 24 ++++++++++------------
 .../HiveEngineConcurrentConnExecutor.scala         |  9 ++------
 .../hive/hook/HiveAddJarsEngineHook.scala          |  8 +++++++-
 4 files changed, 20 insertions(+), 23 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
index 7689ae94af..5a85334f19 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
@@ -19,8 +19,6 @@ package org.apache.linkis.manager.engineplugin.common.conf
 
 import org.apache.linkis.common.conf.{ByteType, CommonVars, Configuration}
 
-import org.apache.commons.lang3.{JavaVersion, SystemUtils}
-
 object EnvConfiguration {
 
   val HIVE_CONF_DIR = CommonVars[String](
diff --git 
a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
 
b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
index 49e65e964a..94f2a9b4d4 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
+++ 
b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
@@ -18,6 +18,7 @@
 package org.apache.linkis.entrance.interceptor.impl;
 
 import org.apache.linkis.governance.common.entity.job.JobRequest;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -55,24 +56,21 @@ class SQLExplainTest {
   }
 
   /**
-   * 未修复前代码进行拼接sql时,输出的sql为
-   *      select
-   *      id,
-   *      name,
-   *      array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 
5000;
-   *      ') as infos
-   *      from ods.dim_ep22
+   * 未修复前代码进行拼接sql时,输出的sql为 select id, name,
+   * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 
5000; ') as infos from
+   * ods.dim_ep22
    */
   @Test
   void splicingLimitSql() {
-    String code = "select\n" +
-            "id,\n" +
-            "name,\n" +
-            "array_join(array_intersect(map_keys(info),array['abs','oda'],';') 
as infos\n" +
-            "from ods.dim_ep22";
+    String code =
+        "select\n"
+            + "id,\n"
+            + "name,\n"
+            + 
"array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n"
+            + "from ods.dim_ep22";
     StringBuilder logAppender = new StringBuilder();
     JobRequest jobRequest = new JobRequest();
     SQLExplain.dealSQLLimit(code, jobRequest, logAppender);
-    Assertions.assertEquals(code+" limit 5000", jobRequest.getExecutionCode());
+    Assertions.assertEquals(code + " limit 5000", 
jobRequest.getExecutionCode());
   }
 }
diff --git 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
index 47496cf17a..9fe37eb3d9 100644
--- 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
@@ -135,7 +135,7 @@ class HiveEngineConcurrentConnExecutor(
       code: String
   ): ExecuteResponse = {
     LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code")
-    val taskId: String = engineExecutorContext.getJobId.get
+    val taskId: String = engineExecutorContext.getJobId.getOrElse("udf_init")
     CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
 
     val realCode = code.trim()
@@ -166,12 +166,7 @@ class HiveEngineConcurrentConnExecutor(
 
                 val driver = new HiveDriverProxy(any)
                 driverCache.put(taskId, driver)
-                executeHQL(
-                  engineExecutorContext.getJobId.get,
-                  engineExecutorContext,
-                  realCode,
-                  driver
-                )
+                executeHQL(taskId, engineExecutorContext, realCode, driver)
               case _ =>
                 val resp = proc.run(realCode.substring(tokens(0).length).trim)
                 val result = new String(baos.toByteArray)
diff --git 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
index 9019db9621..cd94df7f73 100644
--- 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
+++ 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
@@ -24,7 +24,10 @@ import 
org.apache.linkis.engineconn.common.engineconn.EngineConn
 import org.apache.linkis.engineconn.common.hook.EngineConnHook
 import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
 import org.apache.linkis.engineconn.core.executor.ExecutorManager
-import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
+import org.apache.linkis.engineplugin.hive.executor.{
+  HiveEngineConcurrentConnExecutor,
+  HiveEngineConnExecutor
+}
 import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, 
RunType}
@@ -78,7 +81,10 @@ class HiveAddJarsEngineHook extends EngineConnHook with 
Logging {
           ExecutorManager.getInstance.getExecutorByLabels(labels) match {
             case executor: HiveEngineConnExecutor =>
               executor.executeLine(new EngineExecutionContext(executor), sql)
+            case executor: HiveEngineConcurrentConnExecutor =>
+              executor.executeLine(new EngineExecutionContext(executor), sql)
             case _ =>
+              logger.warn(s"Executor is not a ComputationExecutor, skip adding 
jar: $jar")
           }
         } catch {
           case t: Throwable => logger.error(s"run hive sql ${addSql + jar} 
failed", t)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to