This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 3a0e7420d5 fix hive concurrent mode load udf error & code foramt
(#5289)
3a0e7420d5 is described below
commit 3a0e7420d5919a912c20da80f2fa3f59b7a5b120
Author: aiceflower <[email protected]>
AuthorDate: Wed Nov 19 14:10:02 2025 +0800
fix hive concurrent mode load udf error & code foramt (#5289)
Co-authored-by: aiceflower <[email protected]>
---
.../linkis/common/conf/DWCArgumentsParser.scala | 2 +-
.../apache/linkis/common/utils/VariableUtils.scala | 15 +++++++++++---
.../linkis/common/variable/VariableType.scala | 7 ++++++-
.../storage/utils/StorageConfiguration.scala | 4 ++--
.../apache/linkis/storage/utils/StorageUtils.scala | 4 +++-
.../common/utils/EngineConnArguments.scala | 2 +-
.../common/conf/EnvConfiguration.scala | 2 --
.../entrance/interceptor/impl/SQLExplainTest.java | 24 ++++++++++------------
.../HiveEngineConcurrentConnExecutor.scala | 9 ++------
.../hive/hook/HiveAddJarsEngineHook.scala | 10 ++++++++-
10 files changed, 47 insertions(+), 32 deletions(-)
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
index 00f91d101b..2a5efbf0d0 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.common.conf
-import org.apache.linkis.common.utils.{ParameterUtils, Logging}
+import org.apache.linkis.common.utils.{Logging, ParameterUtils}
import org.apache.commons.lang3.StringUtils
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
index 09c7669fe4..3f529e0454 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala
@@ -21,11 +21,18 @@ import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.exception.LinkisCommonErrorException
import org.apache.linkis.common.variable
import org.apache.linkis.common.variable._
-import org.apache.linkis.common.variable.DateTypeUtils.{getCurHour,
getMonthDay, getToday, getYesterday}
-import org.apache.commons.lang3.{StringUtils, Strings}
+import org.apache.linkis.common.variable.DateTypeUtils.{
+ getCurHour,
+ getMonthDay,
+ getToday,
+ getYesterday
+}
+
+import org.apache.commons.lang3.{Strings, StringUtils}
import java.time.ZonedDateTime
import java.util
+
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.Exception.allCatch
@@ -115,7 +122,9 @@ object VariableUtils extends Logging {
case _ =>
if (!nameAndType.contains(key) && StringUtils.isNotEmpty(value)) {
// if ((allCatch opt value.toDouble).isDefined) {
- if ((allCatch opt BigDecimal(value)).isDefined &&
!Strings.CS.startsWith(value, "0")) {
+ if (
+ (allCatch opt BigDecimal(value)).isDefined &&
!Strings.CS.startsWith(value, "0")
+ ) {
nameAndType(key) = variable.BigDecimalValue(BigDecimal(value))
} else {
nameAndType(key) = variable.StringType(value)
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
index 75e22cb51b..031d6e79cf 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala
@@ -111,6 +111,7 @@ case class YearType(value: CustomYearType) extends
VariableType {
}
case class BigDecimalValue(value: BigDecimal) extends VariableType {
+
override def getValue: String = {
val result = bigDecimalOrLong(value)
result match {
@@ -126,7 +127,10 @@ case class BigDecimalValue(value: BigDecimal) extends
VariableType {
case "*" => val res = value * BigDecimal(bValue); formatResult(res)
case "/" => val res = value / BigDecimal(bValue); formatResult(res)
case _ =>
- throw new LinkisCommonErrorException(20050, s"BigDecimal class is not
supported to use:$signal")
+ throw new LinkisCommonErrorException(
+ 20050,
+ s"BigDecimal class is not supported to use:$signal"
+ )
}
}
@@ -146,6 +150,7 @@ case class BigDecimalValue(value: BigDecimal) extends
VariableType {
bd
}
}
+
}
case class LongType(value: Long) extends VariableType {
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
index 17345c050a..3785a46045 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
@@ -50,8 +50,8 @@ object StorageConfiguration {
val STORAGE_BUILD_FS_CLASSES = CommonVars(
"wds.linkis.storage.build.fs.classes",
"org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem,"
+
-
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem,"
+
- "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem"
+
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem,"
+
+ "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem"
)
val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true)
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
index a38b0edc4c..f2948496ca 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
@@ -204,7 +204,9 @@ object StorageUtils extends Logging {
* @return
*/
def getFsPath(path: String): FsPath = {
- if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) ||
path.startsWith(BLOB_SCHEMA)) new FsPath(path)
+ if (
+ path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) ||
path.startsWith(BLOB_SCHEMA)
+ ) new FsPath(path)
else {
new FsPath(FILE_SCHEMA + path)
}
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
index adb34b98aa..77b0d0f44a 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.governance.common.utils
-import org.apache.linkis.common.utils.{ParameterUtils, Logging}
+import org.apache.linkis.common.utils.{Logging, ParameterUtils}
import org.apache.commons.lang3.StringUtils
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
index 7689ae94af..5a85334f19 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
@@ -19,8 +19,6 @@ package org.apache.linkis.manager.engineplugin.common.conf
import org.apache.linkis.common.conf.{ByteType, CommonVars, Configuration}
-import org.apache.commons.lang3.{JavaVersion, SystemUtils}
-
object EnvConfiguration {
val HIVE_CONF_DIR = CommonVars[String](
diff --git
a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
index 49e65e964a..94f2a9b4d4 100644
---
a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
+++
b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java
@@ -18,6 +18,7 @@
package org.apache.linkis.entrance.interceptor.impl;
import org.apache.linkis.governance.common.entity.job.JobRequest;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -55,24 +56,21 @@ class SQLExplainTest {
}
/**
- * 未修复前代码进行拼接sql时,输出的sql为
- * select
- * id,
- * name,
- * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit
5000;
- * ') as infos
- * from ods.dim_ep22
+ * 未修复前代码进行拼接sql时,输出的sql为 select id, name,
+ * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit
5000; ') as infos from
+ * ods.dim_ep22
*/
@Test
void splicingLimitSql() {
- String code = "select\n" +
- "id,\n" +
- "name,\n" +
- "array_join(array_intersect(map_keys(info),array['abs','oda'],';')
as infos\n" +
- "from ods.dim_ep22";
+ String code =
+ "select\n"
+ + "id,\n"
+ + "name,\n"
+ +
"array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n"
+ + "from ods.dim_ep22";
StringBuilder logAppender = new StringBuilder();
JobRequest jobRequest = new JobRequest();
SQLExplain.dealSQLLimit(code, jobRequest, logAppender);
- Assertions.assertEquals(code+" limit 5000", jobRequest.getExecutionCode());
+ Assertions.assertEquals(code + " limit 5000",
jobRequest.getExecutionCode());
}
}
diff --git
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
index 47496cf17a..9fe37eb3d9 100644
---
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
+++
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
@@ -135,7 +135,7 @@ class HiveEngineConcurrentConnExecutor(
code: String
): ExecuteResponse = {
LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code")
- val taskId: String = engineExecutorContext.getJobId.get
+ val taskId: String = engineExecutorContext.getJobId.getOrElse("udf_init")
CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
val realCode = code.trim()
@@ -166,12 +166,7 @@ class HiveEngineConcurrentConnExecutor(
val driver = new HiveDriverProxy(any)
driverCache.put(taskId, driver)
- executeHQL(
- engineExecutorContext.getJobId.get,
- engineExecutorContext,
- realCode,
- driver
- )
+ executeHQL(taskId, engineExecutorContext, realCode, driver)
case _ =>
val resp = proc.run(realCode.substring(tokens(0).length).trim)
val result = new String(baos.toByteArray)
diff --git
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
index 9019db9621..aaa96274d3 100644
---
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
+++
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
@@ -24,7 +24,10 @@ import
org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.hook.EngineConnHook
import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import org.apache.linkis.engineconn.core.executor.ExecutorManager
-import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
+import org.apache.linkis.engineplugin.hive.executor.{
+ HiveEngineConcurrentConnExecutor,
+ HiveEngineConnExecutor
+}
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel,
RunType}
@@ -78,7 +81,12 @@ class HiveAddJarsEngineHook extends EngineConnHook with
Logging {
ExecutorManager.getInstance.getExecutorByLabels(labels) match {
case executor: HiveEngineConnExecutor =>
executor.executeLine(new EngineExecutionContext(executor), sql)
+ logger.info("use hive none concurrent mode.")
+ case executor: HiveEngineConcurrentConnExecutor =>
+ executor.executeLine(new EngineExecutionContext(executor), sql)
+ logger.info("use hive concurrent mode.")
case _ =>
+ logger.warn(s"Executor is not a ComputationExecutor, skip adding
jar: $jar")
}
} catch {
case t: Throwable => logger.error(s"run hive sql ${addSql + jar}
failed", t)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]