This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 0bd23e52d Fix seatunnel log conflicts (#3872)
0bd23e52d is described below
commit 0bd23e52d5f3299d36c1c63034b9b7962be896e8
Author: dlimeng <[email protected]>
AuthorDate: Fri Nov 25 09:51:17 2022 +0800
Fix seatunnel log conflicts (#3872)
* seatunnel once
* seatunnel config
* seatunnel log
---
LICENSE | 2 +-
.../seatunnel/client/LinkisSeatunnelFlinkClient.java | 2 +-
.../seatunnel/client/LinkisSeatunnelFlinkSQLClient.java | 2 +-
.../main/java/org/apache/seatunnel/common/config/Common.java | 1 -
.../org/apache/seatunnel/core/base/config/ConfigBuilder.java | 8 +++-----
.../org/apache/seatunnel/core/base/config/PluginFactory.java | 7 +++----
.../java/org/apache/seatunnel/core/flink/FlinkStarter.java | 7 +++----
.../java/org/apache/seatunnel/core/spark/SparkStarter.java | 7 ++++---
.../java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java | 7 +++----
.../seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala | 2 +-
.../seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala | 7 ++++---
.../engineconnplugin/seatunnel/util/SeatunnelUtils.scala | 11 +++++------
12 files changed, 29 insertions(+), 34 deletions(-)
diff --git a/LICENSE b/LICENSE
index 7b6c85a0f..b9d493e97 100644
--- a/LICENSE
+++ b/LICENSE
@@ -235,7 +235,7 @@ The following file are provided under the Apache 2.0
License.
linkis-web/src/common/i18n/zh.json
linkis-web/src/config.json
linkis-web/public/favicon.ico
-
+ linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/*
The files:
.mvn/wrapper/MavenWrapperDownloader.java
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
index 14cb4bdd7..54c2ec27f 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LinkisSeatunnelFlinkClient {
- private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelSparkClient.class);
+ private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelFlinkClient.class);
private static Class<?> seatunnelEngineClass;
private static JarLoader jarLoader;
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
index 3c18c3ad5..a9526b104 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LinkisSeatunnelFlinkSQLClient {
- private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelSparkClient.class);
+ private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelFlinkSQLClient.class);
private static Class<?> seatunnelEngineClass;
private static JarLoader jarLoader;
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
index 7b0815f80..739af25de 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -66,7 +66,6 @@ public class Common {
if (MODE.equals(Optional.of(DeployMode.CLIENT.getName()))) {
try {
String path = System.getProperty("SEATUNNEL_HOME") + "/seatunnel";
- System.out.println("appRootDir:" + path);
path = new File(path).getPath();
return Paths.get(path);
} catch (Exception e) {
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 796bbebb3..1aa357966 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.core.base.config;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -26,9 +28,6 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import java.nio.file.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Used to build the {@link Config} from file.
*
@@ -36,8 +35,7 @@ import org.slf4j.LoggerFactory;
*/
public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
- private static Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
-
+ public static final Log LOGGER =
LogFactory.getLog(ConfigBuilder.class.getName());
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final Path configFile;
private final EngineType engine;
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
index a582fa2d8..2070106e8 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.core.base.config;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.config.Common;
@@ -40,9 +42,6 @@ import java.net.URLClassLoader;
import java.util.*;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Used to load the plugins.
*
@@ -50,7 +49,7 @@ import org.slf4j.LoggerFactory;
*/
public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
- private static Logger LOGGER = LoggerFactory.getLogger(PluginFactory.class);
+ public static final Log LOGGER =
LogFactory.getLog(PluginFactory.class.getName());
private final Config config;
private final EngineType engineType;
private static final Map<EngineType, Map<PluginType, Class<?>>>
PLUGIN_BASE_CLASS_MAP;
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index e421126cb..ffa7af7bf 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.core.flink;
import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
@@ -27,15 +29,12 @@ import
org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* The SeaTunnel flink starter. This class is responsible for generate the
final flink job execute
* command.
*/
public class FlinkStarter implements Starter {
- private static Logger logger = LoggerFactory.getLogger(FlinkStarter.class);
+ public static final Log logger =
LogFactory.getLog(FlinkStarter.class.getName());
private static final String APP_NAME = SeatunnelFlink.class.getName();
private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index cea35472a..60a0204c7 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.core.spark;
import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
@@ -45,13 +47,12 @@ import java.util.stream.Stream;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.UnixStyleUsageFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
public class SparkStarter implements Starter {
- private static Logger logger = LoggerFactory.getLogger(SparkStarter.class);
+ public static final Log logger =
LogFactory.getLog(SparkStarter.class.getName());
+
private static final int USAGE_EXIT_CODE = 234;
private static final int PLUGIN_LIB_DIR_DEPTH = 3;
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 0e0de0f8c..b7fd2fec6 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.core.sql;
import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
@@ -27,11 +29,8 @@ import
org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class FlinkSqlStarter implements Starter {
- private static Logger logger =
LoggerFactory.getLogger(FlinkSqlStarter.class);
+ public static final Log logger =
LogFactory.getLog(FlinkSqlStarter.class.getName());
private static final String APP_JAR_NAME = "seatunnel-core-flink-sql.jar";
private static final String CLASS_NAME = SeatunnelSql.class.getName();
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
index f55c89900..3e9cb1cda 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
@@ -135,7 +135,7 @@ class SeatunnelFlinkOnceCodeExecutor(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) +
"/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
- info(s"Execute SeatunnelFlink Process end")
+ info(s"Execute SeatunnelFlink Process end args:${args.mkString(" ")}")
LinkisSeatunnelFlinkClient.main(args)
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
index 0d4d56b65..9e0e3312f 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
@@ -48,6 +48,8 @@ import java.nio.file.Files
import java.util
import java.util.concurrent.{Future, TimeUnit}
+import scala.language.postfixOps
+
class SeatunnelSparkOnceCodeExecutor(
override val id: Long,
override protected val seatunnelEngineConnContext:
SeatunnelEngineConnContext
@@ -91,14 +93,13 @@ class SeatunnelSparkOnceCodeExecutor(
protected def runCode(code: String): Int = {
info("Execute SeatunnelSpark Process")
val masterKey = LINKIS_SPARK_MASTER.getValue
- val configKey = LINKIS_SPARK_CONFIG.getValue
val deployModeKey = LINKIS_SPARK_DEPLOY_MODE.getValue
var args: Array[String] = Array.empty
if (params != null && StringUtils.isNotBlank(params.get(masterKey))) {
args = Array(
GET_LINKIS_SPARK_MASTER,
- params.getOrDefault(masterKey, "yarn"),
+ params.getOrDefault(masterKey, "local[4]"),
GET_LINKIS_SPARK_DEPLOY_MODE,
params.getOrDefault(deployModeKey, "client"),
GET_LINKIS_SPARK_CONFIG,
@@ -113,7 +114,7 @@ class SeatunnelSparkOnceCodeExecutor(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) +
"/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
- info(s"Execute SeatunnelSpark Process end")
+ info(s"Execute SeatunnelSpark Process end args:${args.mkString(" ")}")
LinkisSeatunnelSparkClient.main(args)
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
index c0140d6fe..8999e93a4 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
@@ -17,20 +17,20 @@
package org.apache.linkis.engineconnplugin.seatunnel.util
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.engineconn.acessible.executor.log.LogHelper
import
org.apache.linkis.engineconn.common.conf.EngineConnConf.ENGINE_CONN_LOCAL_PATH_PWD_KEY
import
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelSparkEnvConfiguration
import org.apache.commons.io.IOUtils
+import org.apache.commons.logging.{Log, LogFactory}
import java.io.{BufferedReader, File, InputStreamReader, PrintWriter}
-object SeatunnelUtils extends Logging {
+object SeatunnelUtils {
+ val LOGGER: Log = LogFactory.getLog(SeatunnelUtils.getClass)
private var process: Process = _
def localArray(code: String): Array[String] = {
- Array(SeatunnelSparkEnvConfiguration.LINKIS_SPARK_CONFIG.getValue,
generateExecFile(code))
+ Array(SeatunnelSparkEnvConfiguration.GET_LINKIS_SPARK_CONFIG,
generateExecFile(code))
}
def generateExecFile(code: String): String = {
@@ -54,10 +54,9 @@ object SeatunnelUtils extends Logging {
while ({
line = bufferedReader.readLine(); line != null
}) {
- LogHelper.logCache.cacheLog(line)
+ LOGGER.info(line)
}
val exitcode = process.waitFor()
- logger.info("executeLine exitcode:" + exitcode)
exitcode
} finally {
IOUtils.closeQuietly(bufferedReader)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]