This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a0e7420d5 fix hive concurrent mode load udf error & code foramt 
(#5289)
3a0e7420d5 is described below

commit 3a0e7420d5919a912c20da80f2fa3f59b7a5b120
Author: aiceflower <[email protected]>
AuthorDate: Wed Nov 19 14:10:02 2025 +0800

    fix hive concurrent mode load udf error & code foramt (#5289)
    
    Co-authored-by: aiceflower <[email protected]>
---
 .../linkis/common/conf/DWCArgumentsParser.scala    |  2 +-
 .../apache/linkis/common/utils/VariableUtils.scala | 15 +++++++++++---
 .../linkis/common/variable/VariableType.scala      |  7 ++++++-
 .../storage/utils/StorageConfiguration.scala       |  4 ++--
 .../apache/linkis/storage/utils/StorageUtils.scala |  4 +++-
 .../common/utils/EngineConnArguments.scala         |  2 +-
 .../common/conf/EnvConfiguration.scala             |  2 --
 .../entrance/interceptor/impl/SQLExplainTest.java  | 24 ++++++++++------------
 .../HiveEngineConcurrentConnExecutor.scala         |  9 ++------
 .../hive/hook/HiveAddJarsEngineHook.scala          | 10 ++++++++-
 10 files changed, 47 insertions(+), 32 deletions(-)

diff --git 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
index 00f91d101b..2a5efbf0d0 100644
--- 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
+++ 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
@@ -17,7 +17,7 @@
 
 package org.apache.linkis.common.conf
 
-import org.apache.linkis.common.utils.{ParameterUtils, Logging}
+import org.apache.linkis.common.utils.{Logging, ParameterUtils}
 
 import org.apache.commons.lang3.StringUtils
 
diff --git 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
index 09c7669fe4..3f529e0454 100644
--- 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
+++ 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
@@ -21,11 +21,18 @@ import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.exception.LinkisCommonErrorException
 import org.apache.linkis.common.variable
 import org.apache.linkis.common.variable._
-import org.apache.linkis.common.variable.DateTypeUtils.{getCurHour, 
getMonthDay, getToday, getYesterday}
-import org.apache.commons.lang3.{StringUtils, Strings}
+import org.apache.linkis.common.variable.DateTypeUtils.{
+  getCurHour,
+  getMonthDay,
+  getToday,
+  getYesterday
+}
+
+import org.apache.commons.lang3.{Strings, StringUtils}
 
 import java.time.ZonedDateTime
 import java.util
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.control.Exception.allCatch
@@ -115,7 +122,9 @@ object VariableUtils extends Logging {
           case _ =>
             if (!nameAndType.contains(key) && StringUtils.isNotEmpty(value)) {
 //              if ((allCatch opt value.toDouble).isDefined) {
-              if ((allCatch opt BigDecimal(value)).isDefined && 
!Strings.CS.startsWith(value, "0")) {
+              if (
+                  (allCatch opt BigDecimal(value)).isDefined && 
!Strings.CS.startsWith(value, "0")
+              ) {
                 nameAndType(key) = variable.BigDecimalValue(BigDecimal(value))
               } else {
                 nameAndType(key) = variable.StringType(value)
diff --git 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
index 75e22cb51b..031d6e79cf 100644
--- 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
+++ 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
@@ -111,6 +111,7 @@ case class YearType(value: CustomYearType) extends 
VariableType {
 }
 
 case class BigDecimalValue(value: BigDecimal) extends VariableType {
+
   override def getValue: String = {
     val result = bigDecimalOrLong(value)
     result match {
@@ -126,7 +127,10 @@ case class BigDecimalValue(value: BigDecimal) extends 
VariableType {
       case "*" => val res = value * BigDecimal(bValue); formatResult(res)
       case "/" => val res = value / BigDecimal(bValue); formatResult(res)
       case _ =>
-        throw new LinkisCommonErrorException(20050, s"BigDecimal class is not 
supported to use:$signal")
+        throw new LinkisCommonErrorException(
+          20050,
+          s"BigDecimal class is not supported to use:$signal"
+        )
     }
   }
 
@@ -146,6 +150,7 @@ case class BigDecimalValue(value: BigDecimal) extends 
VariableType {
       bd
     }
   }
+
 }
 
 case class LongType(value: Long) extends VariableType {
diff --git 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
index 17345c050a..3785a46045 100644
--- 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
+++ 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
@@ -50,8 +50,8 @@ object StorageConfiguration {
   val STORAGE_BUILD_FS_CLASSES = CommonVars(
     "wds.linkis.storage.build.fs.classes",
     
"org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem,"
 +
-    
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem,"
 +
-    "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem"
+      
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem,"
 +
+      "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem"
   )
 
   val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true)
diff --git 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
index a38b0edc4c..f2948496ca 100644
--- 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
+++ 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
@@ -204,7 +204,9 @@ object StorageUtils extends Logging {
    * @return
    */
   def getFsPath(path: String): FsPath = {
-    if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || 
path.startsWith(BLOB_SCHEMA)) new FsPath(path)
+    if (
+        path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || 
path.startsWith(BLOB_SCHEMA)
+    ) new FsPath(path)
     else {
       new FsPath(FILE_SCHEMA + path)
     }
diff --git 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
index adb34b98aa..77b0d0f44a 100644
--- 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
+++ 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
@@ -17,7 +17,7 @@
 
 package org.apache.linkis.governance.common.utils
 
-import org.apache.linkis.common.utils.{ParameterUtils, Logging}
+import org.apache.linkis.common.utils.{Logging, ParameterUtils}
 
 import org.apache.commons.lang3.StringUtils
 
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
index 7689ae94af..5a85334f19 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
@@ -19,8 +19,6 @@ package org.apache.linkis.manager.engineplugin.common.conf
 
 import org.apache.linkis.common.conf.{ByteType, CommonVars, Configuration}
 
-import org.apache.commons.lang3.{JavaVersion, SystemUtils}
-
 object EnvConfiguration {
 
   val HIVE_CONF_DIR = CommonVars[String](
diff --git 
a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
 
b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
index 49e65e964a..94f2a9b4d4 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
+++ 
b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
@@ -18,6 +18,7 @@
 package org.apache.linkis.entrance.interceptor.impl;
 
 import org.apache.linkis.governance.common.entity.job.JobRequest;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -55,24 +56,21 @@ class SQLExplainTest {
   }
 
   /**
-   * 未修复前代码进行拼接sql时,输出的sql为
-   *      select
-   *      id,
-   *      name,
-   *      array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 
5000;
-   *      ') as infos
-   *      from ods.dim_ep22
+   * 未修复前代码进行拼接sql时,输出的sql为 select id, name,
+   * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 
5000; ') as infos from
+   * ods.dim_ep22
    */
   @Test
   void splicingLimitSql() {
-    String code = "select\n" +
-            "id,\n" +
-            "name,\n" +
-            "array_join(array_intersect(map_keys(info),array['abs','oda'],';') 
as infos\n" +
-            "from ods.dim_ep22";
+    String code =
+        "select\n"
+            + "id,\n"
+            + "name,\n"
+            + 
"array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n"
+            + "from ods.dim_ep22";
     StringBuilder logAppender = new StringBuilder();
     JobRequest jobRequest = new JobRequest();
     SQLExplain.dealSQLLimit(code, jobRequest, logAppender);
-    Assertions.assertEquals(code+" limit 5000", jobRequest.getExecutionCode());
+    Assertions.assertEquals(code + " limit 5000", 
jobRequest.getExecutionCode());
   }
 }
diff --git 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
index 47496cf17a..9fe37eb3d9 100644
--- 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
@@ -135,7 +135,7 @@ class HiveEngineConcurrentConnExecutor(
       code: String
   ): ExecuteResponse = {
     LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code")
-    val taskId: String = engineExecutorContext.getJobId.get
+    val taskId: String = engineExecutorContext.getJobId.getOrElse("udf_init")
     CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
 
     val realCode = code.trim()
@@ -166,12 +166,7 @@ class HiveEngineConcurrentConnExecutor(
 
                 val driver = new HiveDriverProxy(any)
                 driverCache.put(taskId, driver)
-                executeHQL(
-                  engineExecutorContext.getJobId.get,
-                  engineExecutorContext,
-                  realCode,
-                  driver
-                )
+                executeHQL(taskId, engineExecutorContext, realCode, driver)
               case _ =>
                 val resp = proc.run(realCode.substring(tokens(0).length).trim)
                 val result = new String(baos.toByteArray)
diff --git 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
index 9019db9621..aaa96274d3 100644
--- 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
+++ 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
@@ -24,7 +24,10 @@ import 
org.apache.linkis.engineconn.common.engineconn.EngineConn
 import org.apache.linkis.engineconn.common.hook.EngineConnHook
 import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
 import org.apache.linkis.engineconn.core.executor.ExecutorManager
-import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
+import org.apache.linkis.engineplugin.hive.executor.{
+  HiveEngineConcurrentConnExecutor,
+  HiveEngineConnExecutor
+}
 import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, 
RunType}
@@ -78,7 +81,12 @@ class HiveAddJarsEngineHook extends EngineConnHook with 
Logging {
           ExecutorManager.getInstance.getExecutorByLabels(labels) match {
             case executor: HiveEngineConnExecutor =>
               executor.executeLine(new EngineExecutionContext(executor), sql)
+              logger.info("use hive none concurrent mode.")
+            case executor: HiveEngineConcurrentConnExecutor =>
+              executor.executeLine(new EngineExecutionContext(executor), sql)
+              logger.info("use hive concurrent mode.")
             case _ =>
+              logger.warn(s"Executor is not a ComputationExecutor, skip adding 
jar: $jar")
           }
         } catch {
           case t: Throwable => logger.error(s"run hive sql ${addSql + jar} 
failed", t)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to