This is an automated email from the ASF dual-hosted git repository. kinghao pushed a commit to branch 1.9.0-fixhivecurrenc in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 55c9df0c401223b9614cc04337e28ad93c17fdd7 Author: kinghao <[email protected]> AuthorDate: Tue Nov 18 21:03:34 2025 +0800 fix hive concurrent mode load udf error problems --- .../common/conf/EnvConfiguration.scala | 2 -- .../entrance/interceptor/impl/SQLExplainTest.java | 24 ++++++++++------------ .../HiveEngineConcurrentConnExecutor.scala | 9 ++------ .../hive/hook/HiveAddJarsEngineHook.scala | 8 +++++++- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala index 7689ae94af..5a85334f19 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala @@ -19,8 +19,6 @@ package org.apache.linkis.manager.engineplugin.common.conf import org.apache.linkis.common.conf.{ByteType, CommonVars, Configuration} -import org.apache.commons.lang3.{JavaVersion, SystemUtils} - object EnvConfiguration { val HIVE_CONF_DIR = CommonVars[String]( diff --git a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java index 49e65e964a..94f2a9b4d4 100644 --- a/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java +++ b/linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java @@ -18,6 +18,7 @@ package org.apache.linkis.entrance.interceptor.impl; import org.apache.linkis.governance.common.entity.job.JobRequest; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -55,24 +56,21 @@ class SQLExplainTest { } /** - * 未修复前代码进行拼接sql时,输出的sql为 - * select - * id, - * name, - * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000; - * ') as infos - * from ods.dim_ep22 + * 未修复前代码进行拼接sql时,输出的sql为 select id, name, + * array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000; ') as infos from + * ods.dim_ep22 */ @Test void splicingLimitSql() { - String code = "select\n" + - "id,\n" + - "name,\n" + - "array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n" + - "from ods.dim_ep22"; + String code = + "select\n" + + "id,\n" + + "name,\n" + + "array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n" + + "from ods.dim_ep22"; StringBuilder logAppender = new StringBuilder(); JobRequest jobRequest = new JobRequest(); SQLExplain.dealSQLLimit(code, jobRequest, logAppender); - Assertions.assertEquals(code+" limit 5000", jobRequest.getExecutionCode()); + Assertions.assertEquals(code + " limit 5000", jobRequest.getExecutionCode()); } } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala index 47496cf17a..9fe37eb3d9 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala @@ -135,7 +135,7 @@ class HiveEngineConcurrentConnExecutor( code: String ): ExecuteResponse = { LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code") - val taskId: String = engineExecutorContext.getJobId.get + val taskId: String = engineExecutorContext.getJobId.getOrElse("udf_init") CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf) val realCode = code.trim() @@ -166,12 +166,7 @@ class HiveEngineConcurrentConnExecutor( val driver = new HiveDriverProxy(any) driverCache.put(taskId, driver) - executeHQL( - engineExecutorContext.getJobId.get, - engineExecutorContext, - realCode, - driver - ) + executeHQL(taskId, engineExecutorContext, realCode, driver) case _ => val resp = proc.run(realCode.substring(tokens(0).length).trim) val result = new String(baos.toByteArray) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala index 9019db9621..cd94df7f73 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala @@ -24,7 +24,10 @@ import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.common.hook.EngineConnHook import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.engineconn.core.executor.ExecutorManager -import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor +import org.apache.linkis.engineplugin.hive.executor.{ + HiveEngineConcurrentConnExecutor, + HiveEngineConnExecutor +} import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType} @@ -78,7 +81,10 @@ class HiveAddJarsEngineHook extends EngineConnHook with Logging { ExecutorManager.getInstance.getExecutorByLabels(labels) match { case executor: HiveEngineConnExecutor => executor.executeLine(new EngineExecutionContext(executor), sql) + case executor: HiveEngineConcurrentConnExecutor => + executor.executeLine(new EngineExecutionContext(executor), sql) case _ => + logger.warn(s"Executor is not a ComputationExecutor, skip adding jar: $jar") } } catch { case t: Throwable => logger.error(s"run hive sql ${addSql + jar} failed", t) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
