This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9419524 eliminate duplicate code for CheckResult class (#1113)
9419524 is described below
commit 94195249da2255b2e1f5731c94417639936d462f
Author: CenterCode <[email protected]>
AuthorDate: Thu Jan 20 13:56:13 2022 +0800
eliminate duplicate code for CheckResult class (#1113)
* [SeaTunnel #1112][common] eliminate duplicate code for CheckResult class
---
.../seatunnel/flink/batch/FlinkBatchExecution.java | 3 +--
.../flink/stream/FlinkStreamExecution.java | 3 +--
.../seatunnel/flink/util/EnvironmentUtil.java | 9 ++++----
.../apache/seatunnel/spark/SparkEnvironment.scala | 3 +--
.../spark/batch/SparkBatchExecution.scala | 4 +---
.../spark/stream/SparkStreamingExecution.scala | 3 +--
.../StructuredStreamingExecution.scala | 3 +--
.../org/apache/seatunnel/common/Constants.java | 2 --
.../seatunnel/common/config/CheckConfigUtil.java | 16 +++++++--------
.../seatunnel/common/config/CheckResult.java | 24 ++++++++++++++++++++++
.../apache/seatunnel/flink/sink/ConsoleSink.java | 3 +--
.../seatunnel/flink/source/FakeSourceStream.java | 3 +--
.../seatunnel/flink/source/SocketStream.java | 3 +--
.../apache/seatunnel/spark/sink/Clickhouse.scala | 16 ++++++---------
.../org/apache/seatunnel/spark/sink/Console.scala | 5 ++---
.../org/apache/seatunnel/spark/sink/Doris.scala | 4 ++--
.../org/apache/seatunnel/spark/source/Fake.scala | 3 +--
.../apache/seatunnel/spark/sink/FileSinkBase.scala | 8 +++-----
.../org/apache/seatunnel/spark/sink/Kafka.scala | 5 ++---
.../apache/seatunnel/spark/source/MongoDB.scala | 8 +++-----
.../org/apache/seatunnel/spark/sink/Redis.scala | 6 +++---
.../org/apache/seatunnel/spark/source/Redis.scala | 12 +++++------
.../seatunnel/spark/source/SocketStream.scala | 3 +--
.../main/java/org/apache/seatunnel/Seatunnel.java | 2 +-
.../apache/seatunnel/spark/transform/Json.scala | 2 +-
25 files changed, 74 insertions(+), 79 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 6c44969..c7f0cf9 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.flink.batch;
-import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -121,7 +120,7 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, Constants.CHECK_SUCCESS);
+ return CheckResult.success();
}
@Override
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 660cdc0..4326a42 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.flink.stream;
-import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -121,7 +120,7 @@ public class FlinkStreamExecution implements
Execution<FlinkStreamSource, FlinkS
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, Constants.CHECK_SUCCESS);
+ return CheckResult.success();
}
@Override
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
index 1951317..53a01d5 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.flink.util;
-import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.flink.api.common.ExecutionConfig;
@@ -68,18 +67,18 @@ public class EnvironmentUtil {
switch (restartStrategy.toLowerCase()) {
case "fixed-delay":
if (!(config.hasPath(ConfigKeyName.RESTART_ATTEMPTS) &&
config.hasPath(ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS))) {
- return new CheckResult(false,
String.format("fixed-delay restart strategy must set [%s],[%s]",
ConfigKeyName.RESTART_ATTEMPTS, ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS));
+ return CheckResult.error(String.format("fixed-delay
restart strategy must set [%s],[%s]", ConfigKeyName.RESTART_ATTEMPTS,
ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS));
}
break;
case "failure-rate":
if
(!(config.hasPath(ConfigKeyName.RESTART_FAILURE_INTERVAL) &&
config.hasPath(ConfigKeyName.RESTART_FAILURE_RATE) &&
config.hasPath(ConfigKeyName.RESTART_DELAY_INTERVAL))) {
- return new CheckResult(false,
String.format("failure-rate restart strategy must set [%s],[%s],[%s]",
ConfigKeyName.RESTART_FAILURE_INTERVAL, ConfigKeyName.RESTART_FAILURE_RATE,
ConfigKeyName.RESTART_DELAY_INTERVAL));
+ return CheckResult.error(String.format("failure-rate
restart strategy must set [%s],[%s],[%s]",
ConfigKeyName.RESTART_FAILURE_INTERVAL, ConfigKeyName.RESTART_FAILURE_RATE,
ConfigKeyName.RESTART_DELAY_INTERVAL));
}
break;
default:
- return new CheckResult(true, Constants.CHECK_SUCCESS);
+ return CheckResult.success();
}
}
- return new CheckResult(true, Constants.CHECK_SUCCESS);
+ return CheckResult.success();
}
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
index f6a430a..761d892 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
@@ -20,7 +20,6 @@ import java.lang
import scala.collection.JavaConversions._
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.env.RuntimeEnv
import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
@@ -40,7 +39,7 @@ class SparkEnvironment extends RuntimeEnv {
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
+ override def checkConfig(): CheckResult = CheckResult.success()
override def prepare(prepareEnv: lang.Boolean): Unit = {
val sparkConf = createSparkConf()
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
index dad3e3f..8d0640f 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
@@ -23,8 +23,6 @@ import org.apache.seatunnel.spark.{BaseSparkSink,
BaseSparkSource, BaseSparkTran
import org.apache.spark.sql.{Dataset, Row}
import java.util.{List => JList}
-import org.apache.seatunnel.common.Constants
-
import scala.collection.JavaConversions._
class SparkBatchExecution(environment: SparkEnvironment)
@@ -36,7 +34,7 @@ class SparkBatchExecution(environment: SparkEnvironment)
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
+ override def checkConfig(): CheckResult = CheckResult.success()
override def prepare(prepareEnv: Void): Unit = {}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index d822782..e5f702b 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -24,7 +24,6 @@ import org.apache.seatunnel.spark.{BaseSparkSink,
BaseSparkSource, BaseSparkTran
import org.apache.spark.sql.{Dataset, Row}
import java.util.{List => JList}
-import org.apache.seatunnel.common.Constants
import scala.collection.JavaConversions._
@@ -78,7 +77,7 @@ class SparkStreamingExecution(sparkEnvironment:
SparkEnvironment)
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
+ override def checkConfig(): CheckResult = CheckResult.success()
override def prepare(void: Void): Unit = {}
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
index d12131f..8d92637 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
@@ -22,7 +22,6 @@ import org.apache.seatunnel.env.Execution
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
import java.util.{List => JList}
-import org.apache.seatunnel.common.Constants
class StructuredStreamingExecution(environment: SparkEnvironment)
extends Execution[StructuredStreamingSource, BaseSparkTransform,
StructuredStreamingSink] {
@@ -33,7 +32,7 @@ class StructuredStreamingExecution(environment:
SparkEnvironment)
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
+ override def checkConfig(): CheckResult = CheckResult.success()
override def prepare(void: Void): Unit = {}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index 11c1c75..c26c94a 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -24,8 +24,6 @@ public final class Constants {
public static final String LOGO = "SeaTunnel";
- public static final String CHECK_SUCCESS = "All check is success";
-
private Constants() {
}
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
index f00cede..f409466 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.common.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import static org.apache.seatunnel.common.Constants.CHECK_SUCCESS;
-
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -39,9 +37,9 @@ public class CheckConfigUtil {
if (missingParams.length() > 0) {
String errorMsg = String.format("please specify [%s] as non-empty",
missingParams.deleteCharAt(missingParams.length() - 1));
- return new CheckResult(false, errorMsg);
+ return CheckResult.error(errorMsg);
} else {
- return new CheckResult(true, CHECK_SUCCESS);
+ return CheckResult.success();
}
}
@@ -50,7 +48,7 @@ public class CheckConfigUtil {
*/
public static CheckResult checkOne(Config config, String... params) {
if (params.length == 0) {
- return new CheckResult(true, "");
+ return CheckResult.success();
}
List<String> missingParams = new LinkedList();
@@ -63,9 +61,9 @@ public class CheckConfigUtil {
if (missingParams.size() == params.length) {
String errorMsg = String.format("please specify at least one
config of [%s] as non-empty",
missingParams.stream().collect(Collectors.joining(",")));
- return new CheckResult(false, errorMsg);
+ return CheckResult.error(errorMsg);
} else {
- return new CheckResult(true, CHECK_SUCCESS);
+ return CheckResult.success();
}
}
@@ -75,10 +73,10 @@ public class CheckConfigUtil {
public static CheckResult mergeCheckMessage(CheckResult... checkResults) {
List<CheckResult> notPassConfig =
Arrays.stream(checkResults).filter(item ->
!item.isSuccess()).collect(Collectors.toList());
if (notPassConfig.isEmpty()) {
- return new CheckResult(true, CHECK_SUCCESS);
+ return CheckResult.success();
} else {
String errMessage =
notPassConfig.stream().map(CheckResult::getMsg).collect(Collectors.joining(","));
- return new CheckResult(false, errMessage);
+ return CheckResult.error(errMessage);
}
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckResult.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckResult.java
index a965dff..3f0d24c 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckResult.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckResult.java
@@ -22,12 +22,36 @@ import lombok.Data;
@Data
public class CheckResult {
+ private static final CheckResult SUCCESS = new CheckResult(true, "");
+
private boolean success;
private String msg;
+ /**
+ * Do not call this constructor directly,
+ * please use {@link #success} or {@link #error(String)} instead,
+ * will be private in the future
+ */
+ @Deprecated
public CheckResult(boolean success, String msg) {
this.success = success;
this.msg = msg;
}
+
+ /**
+ * @return a successful instance of CheckResult
+ */
+ public static CheckResult success() {
+ return SUCCESS;
+ }
+
+ /**
+ * @param msg the error message
+ * @return an error instance of CheckResult
+ */
+ public static CheckResult error(String msg) {
+ return new CheckResult(false, msg);
+ }
+
}
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
index 151618f..ad716c2 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.flink.sink;
-import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
@@ -59,7 +58,7 @@ public class ConsoleSink extends RichOutputFormat<Row>
implements FlinkBatchSink
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, Constants.CHECK_SUCCESS);
+ return CheckResult.success();
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
b/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
index a0f0cb3..30bfb61 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
@@ -22,7 +22,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.types.Row;
-import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -58,7 +57,7 @@ public class FakeSourceStream extends
RichParallelSourceFunction<Row> implements
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, Constants.CHECK_SUCCESS);
+ return CheckResult.success();
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
b/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
index 4d52d58..af9c50a 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.flink.source;
-import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -65,7 +64,7 @@ public class SocketStream implements FlinkStreamSource<Row> {
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, Constants.CHECK_SUCCESS);
+ return CheckResult.success();
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index 811070d..af4f8a7 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -27,7 +27,6 @@ import scala.collection.immutable.HashMap
import scala.util.{Failure, Success, Try}
import scala.util.matching.Regex
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
@@ -99,8 +98,7 @@ class Clickhouse extends SparkBatchSink {
}
if (nonExistsOptions.nonEmpty) {
- new CheckResult(
- false,
+ CheckResult.error(
"please specify " + nonExistsOptions
.map { option =>
val (name, exists) = option
@@ -110,7 +108,7 @@ class Clickhouse extends SparkBatchSink {
} else if (config.hasPath("username") && !config.hasPath("password") ||
config.hasPath(
"password")
&& !config.hasPath("username")) {
- new CheckResult(false, "please specify username and password at the same
time")
+ CheckResult.error("please specify username and password at the same
time")
} else {
this.jdbcLink = String.format(
"jdbc:clickhouse://%s/%s",
@@ -132,7 +130,7 @@ class Clickhouse extends SparkBatchSink {
this.fields = config.getStringList("fields")
acceptedClickHouseSchema()
} else {
- new CheckResult(true, Constants.CHECK_SUCCESS)
+ CheckResult.success()
}
}
}
@@ -182,8 +180,7 @@ class Clickhouse extends SparkBatchSink {
.filter { case (_, exist) => !exist }
if (nonExistsFields.nonEmpty) {
- new CheckResult(
- false,
+ CheckResult.error(
"field " + nonExistsFields
.map { case (option) => "[" + option + "]" }
.mkString(", ") + " not exist in table " + this.table)
@@ -192,13 +189,12 @@ class Clickhouse extends SparkBatchSink {
.map(field => (tableSchema(field),
Clickhouse.supportOrNot(tableSchema(field))))
.filter { case (_, exist) => !exist }
if (nonSupportedType.nonEmpty) {
- new CheckResult(
- false,
+ CheckResult.error(
"clickHouse data type " + nonSupportedType
.map { case (option) => "[" + option + "]" }
.mkString(", ") + " not support in current version.")
} else {
- new CheckResult(true, Constants.CHECK_SUCCESS)
+ CheckResult.success()
}
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
b/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
index 9b44ad9..94f4842 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
@@ -16,7 +16,6 @@
*/
package org.apache.seatunnel.spark.sink
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
@@ -54,9 +53,9 @@ class Console extends SparkBatchSink {
override def checkConfig(): CheckResult = {
!config.hasPath("limit") || (config.hasPath("limit") &&
config.getInt("limit") >= -1) match {
- case true => new CheckResult(true, Constants.CHECK_SUCCESS)
+ case true => CheckResult.success()
case false =>
- new CheckResult(false, "please specify [limit] as Number[-1, " +
Int.MaxValue + "]")
+ CheckResult.error("please specify [limit] as Number[-1, " +
Int.MaxValue + "]")
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
index 3c3a403..9c13c15 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/sink/Doris.scala
@@ -75,7 +75,7 @@ class Doris extends SparkBatchSink with Serializable {
checkResult
} else if (config.hasPath(Config.USER) && !config.hasPath(Config.PASSWORD)
|| config.hasPath(
Config.PASSWORD) && !config.hasPath(Config.USER)) {
- new CheckResult(false, Config.CHECK_USER_ERROR)
+ CheckResult.error(Config.CHECK_USER_ERROR)
} else {
val host: String = config.getString(Config.HOST)
val dataBase: String = config.getString(Config.DATABASE)
@@ -92,7 +92,7 @@ class Doris extends SparkBatchSink with Serializable {
}
}
}
- new CheckResult(true, Config.CHECK_SUCCESS)
+ CheckResult.error(Config.CHECK_SUCCESS)
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
b/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
index b2c2256..c186ded 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
@@ -16,7 +16,6 @@
*/
package org.apache.seatunnel.spark.source
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
@@ -42,6 +41,6 @@ class Fake extends SparkBatchSource {
}
override def checkConfig(): CheckResult = {
- new CheckResult(true, Constants.CHECK_SUCCESS)
+ CheckResult.success()
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
index 65b6a15..a1cd0fc 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
@@ -18,7 +18,6 @@ package org.apache.seatunnel.spark.sink
import java.util
-import org.apache.seatunnel.common.Constants
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
@@ -37,15 +36,14 @@ abstract class FileSinkBase extends SparkBatchSink {
val dir = config.getString("path")
dir.startsWith("/") || uriInAllowedSchema(dir, allowedURISchema) match
{
- case true => new CheckResult(true, Constants.CHECK_SUCCESS)
+ case true => CheckResult.success()
case false =>
- new CheckResult(
- false,
+ CheckResult.error(
"invalid path URI, please set the following allowed schemas: " +
allowedURISchema.mkString(
", "))
}
}
- case false => new CheckResult(false, "please specify [path] as non-empty
string")
+ case false => CheckResult.error("please specify [path] as non-empty
string")
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
b/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
index fa33eec..f461ad5 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
@@ -20,7 +20,6 @@ import java.util.Properties
import scala.collection.JavaConversions._
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
@@ -40,9 +39,9 @@ class Kafka extends SparkBatchSink with Logging {
val producerConfig = TypesafeConfigUtils.extractSubConfig(config,
producerPrefix, false)
config.hasPath("topic") && producerConfig.hasPath("bootstrap.servers")
match {
- case true => new CheckResult(true, Constants.CHECK_SUCCESS)
+ case true => CheckResult.success()
case false =>
- new CheckResult(false, "please specify [topic] and
[producer.bootstrap.servers]")
+ CheckResult.error("please specify [topic] and
[producer.bootstrap.servers]")
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
index 4ceda87..b175f91 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
import com.alibaba.fastjson.JSON
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
@@ -72,12 +71,11 @@ class MongoDB extends SparkBatchSource {
case true =>
val read = TypesafeConfigUtils.extractSubConfig(config, confPrefix,
false)
read.hasPath("uri") && read.hasPath("database") &&
read.hasPath("collection") match {
- case true => new CheckResult(true, Constants.CHECK_SUCCESS)
- case false => new CheckResult(
- false,
+ case true => CheckResult.success()
+ case false => CheckResult.error(
"please specify [readconfig.uri] and [readconfig.database] and
[readconfig.collection]")
}
- case false => new CheckResult(false, "please specify [readconfig]")
+ case false => CheckResult.error("please specify [readconfig]")
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
b/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
index 07948b1..2761c86 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
@@ -60,9 +60,9 @@ class Redis extends SparkBatchSink with Logging {
def checkParam(checkArr: Array[String]): CheckResult = {
val notExistConfig: Array[String] = checkArr.filter(checkItem =>
!config.hasPath(checkItem))
if (notExistConfig.isEmpty) {
- new CheckResult(true, "redis config is enough")
+ CheckResult.success()
} else {
- new CheckResult(false, s"redis config is not enough please check
config [${notExistConfig.mkString(",")}]")
+ CheckResult.error(s"redis config is not enough please check config
[${notExistConfig.mkString(",")}]")
}
}
@@ -76,7 +76,7 @@ class Redis extends SparkBatchSink with Logging {
case RedisSaveType.SET => checkParam(Array(SET_NAME))
case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
- case _ => new CheckResult(false, "Unknown redis config.
redis_save_type must be in [KV HASH SET ZSET LIST]")
+ case _ => CheckResult.error("Unknown redis config. redis_save_type
must be in [KV HASH SET ZSET LIST]")
}
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
b/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
index c807ab5..edb02ab 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
@@ -18,7 +18,6 @@
package org.apache.seatunnel.spark.source
import com.redislabs.provider.redis.{toRedisContext, RedisConfig,
RedisEndpoint}
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
@@ -38,15 +37,14 @@ class Redis extends SparkBatchSource {
config match {
case _ if !hasTableName =>
- new CheckResult(false, "please specify [result_table_name] as
non-empty string")
- case _ if !hasRedisHost => new CheckResult(false, "please specify [host]
as non-empty string")
+ CheckResult.error("please specify [result_table_name] as non-empty
string")
+ case _ if !hasRedisHost => CheckResult.error("please specify [host] as
non-empty string")
case _ if !hasRedisPassword =>
- new CheckResult(false, "please specify [auth] as non-empty string")
+ CheckResult.error("please specify [auth] as non-empty string")
case _ if !hasKeys =>
- new CheckResult(
- false,
+ CheckResult.error(
"please specify [key_pattern] as non-empty string, multiple key
patterns separated by ','")
- case _ => new CheckResult(true, Constants.CHECK_SUCCESS)
+ case _ => CheckResult.success()
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
b/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
index db93261..ca94fec 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
@@ -18,7 +18,6 @@ package org.apache.seatunnel.spark.source
import scala.collection.JavaConversions._
-import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
@@ -43,7 +42,7 @@ class SocketStream extends SparkStreamingSource[String] {
}
override def checkConfig(): CheckResult = {
- new CheckResult(true, Constants.CHECK_SUCCESS)
+ CheckResult.success()
}
override def rdd2dataset(sparkSession: SparkSession, rdd: RDD[String]):
Dataset[Row] = {
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
index f28203e..4b48d87 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
@@ -109,7 +109,7 @@ public class Seatunnel {
try {
checkResult = plugin.checkConfig();
} catch (Exception e) {
- checkResult = new CheckResult(false, e.getMessage());
+ checkResult = CheckResult.error(e.getMessage());
}
if (!checkResult.isSuccess()) {
LOGGER.error("Plugin[{}] contains invalid config, error:
{} \n", plugin.getClass().getName(), checkResult.getMsg());
diff --git
a/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
b/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
index bd9134b..6e4855b 100644
---
a/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
+++
b/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
@@ -91,7 +91,7 @@ class Json extends BaseSparkTransform {
}
}
- override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
+ override def checkConfig(): CheckResult = CheckResult.success()
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(