This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2347ae847 Fix flink-1.16 ClassNotFoundException bug (#5001)
2347ae847 is described below

commit 2347ae84768cf35247129f72bdaf484d359a5525
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed Nov 29 22:24:56 2023 +0800

    Fix flink-1.16 ClassNotFoundException bug (#5001)
    
    * Fix flink-1.16 ClassNotFoundException bug
    
    * Fix flink-1.16 ClassNotFoundException bug
---
 .../engineconnplugin/flink/config/FlinkEnvConfiguration.scala    | 6 ++++--
 .../engineconnplugin/flink/factory/FlinkEngineConnFactory.scala  | 9 +++++++--
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index 6b521dcee..bcd721c16 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -38,7 +38,7 @@ object FlinkEnvConfiguration {
 
   val FLINK_DIST_JAR_PATH = CommonVars(
     "flink.dist.jar.path",
-    FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar"
+    FLINK_HOME.getValue + s"/lib/flink-dist-${FLINK_VERSION.getValue}.jar"
   )
 
   val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "")
@@ -58,7 +58,9 @@ object FlinkEnvConfiguration {
     "The local lib path of each user in Flink EngineConn."
   )
 
-  val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "")
+  val FLINK_SHIP_DIRECTORIES =
+    CommonVars("flink.yarn.ship-directories", FLINK_HOME.getValue + "/lib")
+
   val FLINK_SHIP_REMOTE_DIRECTORIES = 
CommonVars("flink.yarn.remote.ship-directories", "")
 
   val FLINK_CHECK_POINT_ENABLE = CommonVars("flink.app.checkpoint.enable", 
false)
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index 1c6db3bba..1b9759d84 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -108,7 +108,13 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     val flinkHome = FLINK_HOME.getValue(options)
     val flinkConfDir = FLINK_CONF_DIR.getValue(options)
     val flinkProvidedLibPath = FLINK_PROVIDED_LIB_PATH.getValue(options)
-    val flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
+    val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
+    var flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
+    if (
+        StringUtils.isNotBlank(flinkVersion) && 
flinkVersion.equalsIgnoreCase(FLINK_1_12_2_VERSION)
+    ) {
+      flinkDistJarPath = flinkDistJarPath.replaceFirst("flink-dist", 
"flink-dist_2.11")
+    }
     // Local lib path
     val providedLibDirsArray = 
FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
     // Ship directories
@@ -126,7 +132,6 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
       )
     }
     otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, 
flinkClientType.toLowerCase())
-    val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
     FlinkVersionThreadLocal.setFlinkVersion(flinkVersion)
     val context = new EnvironmentContext(
       defaultEnv,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to