This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 031e8e263 [Improve][Connector-V2][File] Unified excetion for file
source & sink connectors (#3525)
031e8e263 is described below
commit 031e8e263cd73b5ab4270aab614048aa8e86eb01
Author: Tyrantlucifer <[email protected]>
AuthorDate: Sat Nov 26 20:37:36 2022 +0800
[Improve][Connector-V2][File] Unified excetion for file source & sink
connectors (#3525)
* [Improve][Connector-V2][File] Unified exception for file source & sink
connector
* [Improve][Connector-V2][File] Update error codes manual
---
.../connector-v2/Error-Quick-Reference-Manual.md | 12 ++++++-
.../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java | 6 +++-
.../file/hdfs/source/BaseHdfsFileSource.java | 23 ++++++++----
.../seatunnel/file/config/BaseTextFileConfig.java | 5 ++-
...nException.java => FileConnectorErrorCode.java} | 26 +++++++++++---
...nException.java => FileConnectorException.java} | 17 ++++++---
.../seatunnel/file/sink/BaseFileSinkWriter.java | 7 ++--
.../file/sink/config/TextFileSinkConfig.java | 14 +++++---
.../seatunnel/file/sink/util/FileSystemUtils.java | 17 +++++----
.../file/sink/writer/AbstractWriteStrategy.java | 9 +++--
.../file/sink/writer/JsonWriteStrategy.java | 17 ++++-----
.../file/sink/writer/OrcWriteStrategy.java | 38 +++++++++++++-------
.../file/sink/writer/ParquetWriteStrategy.java | 18 ++++++----
.../file/sink/writer/TextWriteStrategy.java | 13 ++++---
.../seatunnel/file/sink/writer/WriteStrategy.java | 5 +--
.../file/sink/writer/WriteStrategyFactory.java | 4 ++-
.../file/source/BaseFileSourceReader.java | 5 ++-
.../file/source/reader/AbstractReadStrategy.java | 3 +-
.../file/source/reader/JsonReadStrategy.java | 13 ++++---
.../file/source/reader/OrcReadStrategy.java | 42 +++++++++++++---------
.../file/source/reader/ParquetReadStrategy.java | 33 +++++++++--------
.../seatunnel/file/source/reader/ReadStrategy.java | 6 ++--
.../file/source/reader/ReadStrategyFactory.java | 4 ++-
.../file/source/reader/TextReadStrategy.java | 9 ++---
.../seatunnel/file/ftp/sink/FtpFileSink.java | 6 +++-
.../seatunnel/file/ftp/source/FtpFileSource.java | 27 +++++++++-----
.../file/local/source/LocalFileSource.java | 23 ++++++++----
.../seatunnel/file/oss/sink/OssFileSink.java | 6 +++-
.../seatunnel/file/oss/source/OssFileSource.java | 23 ++++++++----
.../seatunnel/file/s3/sink/S3FileSink.java | 6 +++-
.../seatunnel/file/s3/source/S3FileSource.java | 23 ++++++++----
.../seatunnel/file/sftp/sink/SftpFileSink.java | 6 +++-
.../seatunnel/file/sftp/source/SftpFileSource.java | 26 +++++++++-----
33 files changed, 335 insertions(+), 157 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 97000da4f..cf47f0cbc 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -82,12 +82,13 @@ This document records some common error codes and
corresponding solutions of Sea
| SOCKET-02 | Failed to send message to socket server | When
the user encounters this error code, it means that there is a problem sending
data and retry is not enabled, please check |
| SOCKET-03 | Unable to write; interrupted while doing another attempt | When
the user encounters this error code, it means that the data writing is
interrupted abnormally, please check |
-## Tablestore Connector Error Codes
+## TableStore Connector Error Codes
| code | description | solution
|
|----------------|---------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------|
| TABLESTORE-01 | Failed to send these rows of data | When users
encounter this error code, it means that failed to write these rows of data,
please check the rows that failed to import |
+
## Hive Connector Error Codes
| code | description |
solution
|
@@ -122,3 +123,12 @@ This document records some common error codes and
corresponding solutions of Sea
| IOTDB-01 | Close IoTDB session failed | When
the user encounters this error code, it indicates that closing the session
failed. Please check |
| IOTDB-02 | Initialize IoTDB client failed | When
the user encounters this error code, it indicates that the client
initialization failed. Please check |
| IOTDB-03 | Close IoTDB client failed | When
the user encounters this error code, it indicates that closing the client
failed. Please check |
+
+
+## File Connector Error Codes
+
+| code | description | solution
|
+|---------|-----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| FILE-01 | File type is invalid | When users encounter this error
code, it means that the this file is not the format that user assigned, please
check it |
+| FILE-02 | Data deserialization failed | When users encounter this error
code, it means that data from files not satisfied the schema that user
assigned, please check data from files whether is correct |
+| FILE-03 | Get file list failed | When users encounter this error
code, it means that connector try to traverse the path and get file list
failed, please check file system whether is work |
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index e8cbad443..5870394a0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -20,10 +20,12 @@ package
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -34,7 +36,9 @@ public abstract class BaseHdfsFileSink extends BaseFileSink {
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
FS_DEFAULT_NAME_KEY);
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = new
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 38e9fa03b..75e9cea1d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -18,14 +18,17 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
@@ -41,7 +44,9 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HdfsSourceConfig.FILE_PATH.key(),
HdfsSourceConfig.FILE_TYPE.key(),
HdfsSourceConfig.DEFAULT_FS.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
@@ -50,7 +55,8 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
+ String errorMsg = String.format("Get file list from this path [%s]
failed", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg,
e);
}
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()).toUpperCase());
@@ -69,16 +75,19 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
break;
case ORC:
case PARQUET:
- throw new UnsupportedOperationException("SeaTunnel does
not support user-defined schema for [parquet, orc] files");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
default:
// never got in there
- throw new UnsupportedOperationException("SeaTunnel does
not supported this file format");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
}
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "Read file schema error.", e);
+ } catch (FileConnectorException e) {
+ String errorMsg = String.format("Get table schema from file
[%s] failed", filePaths.get(0));
+ throw new
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
index 3fb4c9989..9fcd01319 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
@@ -19,9 +19,11 @@ package
org.apache.seatunnel.connectors.seatunnel.file.config;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -47,7 +49,8 @@ public class BaseTextFileConfig implements DelimiterConfig,
CompressConfig, Seri
public BaseTextFileConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
- throw new RuntimeException("compress not support now");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Compress not supported by SeaTunnel file connector now");
}
if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key()) &&
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
similarity index 56%
copy from
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index e747c6aea..e966ebcd5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -17,12 +17,28 @@
package org.apache.seatunnel.connectors.seatunnel.file.exception;
-public class FilePluginException extends Exception {
- public FilePluginException(String message) {
- super(message);
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
+ FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
+ DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
+ FILE_LIST_GET_FAILED("FILE-03", "Get file list failed");
+
+ private final String code;
+ private final String description;
+
+ FileConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
}
- public FilePluginException(String message, Throwable cause) {
- super(message, cause);
+ @Override
+ public String getDescription() {
+ return description;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorException.java
similarity index 57%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorException.java
index e747c6aea..39404b01b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorException.java
@@ -17,12 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.file.exception;
-public class FilePluginException extends Exception {
- public FilePluginException(String message) {
- super(message);
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class FileConnectorException extends SeaTunnelRuntimeException {
+ public FileConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public FileConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
}
- public FilePluginException(String message, Throwable cause) {
- super(message, cause);
+ public FileConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 1a57947a4..3f8a203d2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
@@ -61,8 +63,9 @@ public class BaseFileSinkWriter implements
SinkWriter<SeaTunnelRow, FileCommitIn
public void write(SeaTunnelRow element) throws IOException {
try {
writeStrategy.write(element);
- } catch (Exception e) {
- throw new RuntimeException("Write data error, please check", e);
+ } catch (FileConnectorException e) {
+ String errorMsg = String.format("Write this data [%s] to file
failed", element);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
index c56171172..968df88df 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
@@ -20,9 +20,11 @@ package
org.apache.seatunnel.connectors.seatunnel.file.sink.config;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseTextFileConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -107,7 +109,8 @@ public class TextFileSinkConfig extends BaseTextFileConfig
implements PartitionC
if (this.isEnableTransaction &&
!this.fileNameExpression.contains(BaseSinkConfig.TRANSACTION_EXPRESSION)) {
- throw new RuntimeException("file_name_expression must contains " +
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "file_name_expression must contains " +
BaseSinkConfig.TRANSACTION_EXPRESSION + " when
is_enable_transaction is true");
}
@@ -115,17 +118,20 @@ public class TextFileSinkConfig extends
BaseTextFileConfig implements PartitionC
if (!CollectionUtils.isEmpty(this.partitionFieldList)
&& (CollectionUtils.isEmpty(this.sinkColumnList) ||
!new
HashSet<>(this.sinkColumnList).containsAll(this.partitionFieldList))) {
- throw new RuntimeException("partition fields must in sink
columns");
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "partition fields must in sink columns");
}
if (!CollectionUtils.isEmpty(this.partitionFieldList) &&
!isPartitionFieldWriteInFile) {
if (!this.sinkColumnList.removeAll(this.partitionFieldList)) {
- throw new RuntimeException("remove partition field from sink
columns error");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "remove partition field from sink columns error");
}
}
if (CollectionUtils.isEmpty(this.sinkColumnList)) {
- throw new RuntimeException("sink columns can not be empty");
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "sink columns can not be empty");
}
Map<String, Integer> columnsMap = new
HashMap<>(seaTunnelRowTypeInfo.getFieldNames().length);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index dff9ba57e..a3e3aa1c3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -28,7 +30,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -65,7 +66,8 @@ public class FileSystemUtils {
FileSystem fileSystem = getFileSystem(filePath);
Path path = new Path(filePath);
if (!fileSystem.createNewFile(path)) {
- throw new IOException("create file " + filePath + " error");
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ "create file " + filePath + " error");
}
}
@@ -74,7 +76,8 @@ public class FileSystemUtils {
Path path = new Path(file);
if (fileSystem.exists(path)) {
if (!fileSystem.delete(path, true)) {
- throw new IOException("delete file " + file + " error");
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ "delete file " + file + " error");
}
}
}
@@ -113,7 +116,8 @@ public class FileSystemUtils {
if (fileSystem.rename(oldPath, newPath)) {
log.info("rename file :[" + oldPath + "] to [" + newPath + "]
finish");
} else {
- throw new IOException("rename file :[" + oldPath + "] to [" +
newPath + "] error");
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ "rename file :[" + oldPath + "] to [" + newPath + "]
error");
}
}
@@ -121,7 +125,8 @@ public class FileSystemUtils {
FileSystem fileSystem = getFileSystem(filePath);
Path dfs = new Path(filePath);
if (!fileSystem.mkdirs(dfs)) {
- throw new IOException("create dir " + filePath + " error");
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ "create dir " + filePath + " error");
}
}
@@ -134,7 +139,7 @@ public class FileSystemUtils {
/**
* get the dir in filePath
*/
- public static List<Path> dirList(@NonNull String filePath) throws
FileNotFoundException, IOException {
+ public static List<Path> dirList(@NonNull String filePath) throws
IOException {
FileSystem fileSystem = getFileSystem(filePath);
List<Path> pathList = new ArrayList<>();
if (!fileExist(filePath)) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 1eafcb36e..f6baf69cd 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -25,10 +25,12 @@ import static
org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.VariablesSubstitute;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
@@ -222,7 +224,8 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
try {
FileSystemUtils.deleteFile(getTransactionDir(transactionId));
} catch (IOException e) {
- throw new RuntimeException("abort transaction " + transactionId +
" error.", e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ "Abort transaction " + transactionId + " error, delete
transaction directory failed", e);
}
}
@@ -257,7 +260,9 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
.map(dir -> dir.replaceAll(jobDir, ""))
.collect(Collectors.toList());
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ String.format("Get transaction id from states failed," +
+ "it seems that can not get directory list from
[%s]", jobDir), e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index d03a5ae6b..a53163c79 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
@@ -63,8 +65,8 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
}
fsDataOutputStream.write(rowBytes);
} catch (IOException e) {
- log.error("write data to file {} error", filePath);
- throw new RuntimeException(e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ String.format("Write data to file [%s] failed", filePath),
e);
}
}
@@ -74,16 +76,15 @@ public class JsonWriteStrategy extends
AbstractWriteStrategy {
try {
value.flush();
} catch (IOException e) {
- log.error("error when flush file {}", key);
- throw new RuntimeException(e);
+ throw new
FileConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+ String.format("Flush data to this file [%s] failed",
key), e);
} finally {
try {
value.close();
} catch (IOException e) {
- log.error("error when close output stream {}", key, e);
+ log.warn("Close file output stream {} failed", key, e);
}
}
-
needMoveFiles.put(key, getTargetLocation(key));
});
}
@@ -96,8 +97,8 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
isFirstWrite.put(filePath, true);
} catch (IOException e) {
- log.error("can not get output file stream");
- throw new RuntimeException(e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ String.format("Open file output stream [%s] failed",
filePath), e);
}
}
return fsDataOutputStream;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 42dedca14..116eeb5b8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -24,6 +24,8 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import lombok.NonNull;
@@ -84,7 +86,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
rowBatch.reset();
} catch (IOException e) {
String errorMsg = String.format("Write data to orc file [%s]
error", filePath);
- throw new RuntimeException(errorMsg, e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
}
}
@@ -95,7 +97,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
v.close();
} catch (IOException e) {
String errorMsg = String.format("Close file [%s] orc writer
failed, error msg: [%s]", k, e.getMessage());
- throw new RuntimeException(errorMsg, e);
+ throw new
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
}
needMoveFiles.put(k, getTargetLocation(k));
});
@@ -120,7 +122,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
return newWriter;
} catch (IOException e) {
String errorMsg = String.format("Get orc writer for file [%s]
error", filePath);
- throw new RuntimeException(errorMsg, e);
+ throw new
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
}
}
return writer;
@@ -172,7 +174,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
case NULL:
default:
String errorMsg = String.format("Orc file not support this
type [%s]", type.getSqlType());
- throw new UnsupportedOperationException(errorMsg);
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
}
@@ -224,7 +226,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
setStructColumnVector(value, structColumnVector, row);
break;
default:
- throw new RuntimeException("Unexpected ColumnVector
subtype " + vector.type);
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "Unsupported ColumnVector subtype" + vector.type);
}
}
}
@@ -237,7 +240,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
setColumn(fields[i], structColumnVector.fields[i], row);
}
} else {
- throw new RuntimeException("SeaTunnelRow type expected for field");
+ String errorMsg = String.format("SeaTunnelRow type expected for
field, " +
+ "not support this data type: [%s]", value.getClass());
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
}
@@ -258,7 +263,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
++i;
}
} else {
- throw new RuntimeException("Map type expected for field");
+ String errorMsg = String.format("Map type expected for field, this
field is [%s]", value.getClass());
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
@@ -269,7 +275,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
} else if (value instanceof List) {
valueArray = ((List<?>) value).toArray();
} else {
- throw new RuntimeException("List and Array type expected for
field");
+ String errorMsg = String.format("List and Array type expected for
field, " +
+ "this field is [%s]", value.getClass());
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
listColumnVector.offsets[row] = listColumnVector.childCount;
listColumnVector.lengths[row] = valueArray.length;
@@ -285,7 +293,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
if (value instanceof BigDecimal) {
decimalColumnVector.set(row, HiveDecimal.create((BigDecimal)
value));
} else {
- throw new RuntimeException("BigDecimal type expected for field");
+ String errorMsg = String.format("BigDecimal type expected for
field, this field is [%s]", value.getClass());
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
@@ -297,7 +306,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
} else if (value instanceof LocalTime) {
timestampColumnVector.set(row, Timestamp.valueOf(((LocalTime)
value).atDate(LocalDate.ofEpochDay(0))));
} else {
- throw new RuntimeException("Time series type expected for field");
+ String errorMsg = String.format("Time series type expected for
field, this field is [%s]", value.getClass());
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
@@ -319,7 +329,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
} else if (value instanceof LocalDate) {
longVector.vector[row] = ((LocalDate)
value).getLong(ChronoField.EPOCH_DAY);
} else {
- throw new RuntimeException("Long or Integer type expected for
field");
+ String errorMsg = String.format("Long or Integer type expected for
field, " +
+ "this field is [%s]", value.getClass());
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
@@ -341,7 +353,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
Float floatValue = (Float) value;
doubleVector.vector[rowNum] = floatValue.doubleValue();
} else {
- throw new RuntimeException("Double or Float type expected for
field ");
+ String errorMsg = String.format("Double or Float type expected for
field, " +
+ "this field is [%s]", value.getClass());
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index fb9b139ad..0e04ce650 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -24,7 +24,9 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import lombok.NonNull;
@@ -102,7 +104,7 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
writer.write(record);
} catch (IOException e) {
String errorMsg = String.format("Write data to file [%s] error",
filePath);
- throw new RuntimeException(errorMsg, e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
}
}
@@ -113,7 +115,7 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
v.close();
} catch (IOException e) {
String errorMsg = String.format("Close file [%s] parquet
writer failed, error msg: [%s]", k, e.getMessage());
- throw new RuntimeException(errorMsg, e);
+ throw new
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
}
needMoveFiles.put(k, getTargetLocation(k));
});
@@ -147,7 +149,7 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
return newWriter;
} catch (IOException e) {
String errorMsg = String.format("Get parquet writer for file
[%s] error", filePath);
- throw new RuntimeException(errorMsg, e);
+ throw new
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
}
}
return writer;
@@ -197,8 +199,9 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
}
return recordBuilder.build();
default:
- String errorMsg = String.format("SeaTunnel file connector is
not supported for this data type [%s]", seaTunnelDataType.getSqlType());
- throw new UnsupportedOperationException(errorMsg);
+ String errorMsg = String.format("SeaTunnel file connector is
not supported for this data type [%s]",
+ seaTunnelDataType.getSqlType());
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
}
@@ -278,8 +281,9 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
return Types.optionalGroup().addFields(types).named(fieldName);
case NULL:
default:
- String errorMsg = String.format("SeaTunnel file connector is
not supported for this data type [%s]", seaTunnelDataType.getSqlType());
- throw new UnsupportedOperationException(errorMsg);
+ String errorMsg = String.format("SeaTunnel file connector is
not supported for this data type [%s]",
+ seaTunnelDataType.getSqlType());
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index e9579723d..3545acc88 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -20,9 +20,11 @@ package
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -79,8 +81,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
}
fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow));
} catch (IOException e) {
- log.error("write data to file {} error", filePath);
- throw new RuntimeException(e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ String.format("Write data to file [%s] failed", filePath),
e);
}
}
@@ -90,8 +92,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
try {
value.flush();
} catch (IOException e) {
- log.error("error when flush file {}", key);
- throw new RuntimeException(e);
+ throw new
FileConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+ String.format("Flush data to this file [%s] failed",
key), e);
} finally {
try {
value.close();
@@ -114,7 +116,8 @@ public class TextWriteStrategy extends
AbstractWriteStrategy {
isFirstWrite.put(filePath, true);
} catch (IOException e) {
log.error("can not get output file stream");
- throw new RuntimeException(e);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+ String.format("Open file output stream [%s] failed",
filePath), e);
}
}
return fsDataOutputStream;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 55ac378db..c1370345c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.hadoop.conf.Configuration;
@@ -44,9 +45,9 @@ public interface WriteStrategy extends Transaction,
Serializable {
/**
* write seaTunnelRow to target datasource
* @param seaTunnelRow seaTunnelRow
- * @throws Exception Exceptions
+ * @throws FileConnectorException Exceptions
*/
- void write(SeaTunnelRow seaTunnelRow) throws Exception;
+ void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException;
/**
* set seaTunnelRowTypeInfo in writer
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
index 92467ca92..03388cf58 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import lombok.extern.slf4j.Slf4j;
@@ -33,7 +35,7 @@ public class WriteStrategyFactory {
return fileFormat.getWriteStrategy(textFileSinkConfig);
} catch (IllegalArgumentException e) {
String errorMsg = String.format("File sink connector not support
this file type [%s], please check your config", fileType);
- throw new RuntimeException(errorMsg, e);
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index 3b2088b4d..02c213377 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.connectors.seatunnel.file.source;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
@@ -59,7 +61,8 @@ public class BaseFileSourceReader implements
SourceReader<SeaTunnelRow, FileSour
try {
readStrategy.read(source.splitId(), output);
} catch (Exception e) {
- throw new RuntimeException("File source read error", e);
+ String errorMsg = String.format("Read data from this file [%s]
failed", source.splitId());
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
}
});
context.signalNoMoreElement();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 94218250a..5dc3b3207 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -93,7 +92,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
return configuration;
}
- Configuration getConfiguration() throws FilePluginException {
+ Configuration getConfiguration() {
return getConfiguration(hadoopConf);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index e0ff36c14..68106604f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -21,8 +21,9 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.hadoop.conf.Configuration;
@@ -49,7 +50,7 @@ public class JsonReadStrategy extends AbstractReadStrategy {
}
@Override
- public void read(String path, Collector<SeaTunnelRow> output) throws
Exception {
+ public void read(String path, Collector<SeaTunnelRow> output) throws
FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path(path);
@@ -66,14 +67,16 @@ public class JsonReadStrategy extends AbstractReadStrategy {
}
output.collect(seaTunnelRow);
} catch (IOException e) {
- throw new RuntimeException(e);
+ String errorMsg = String.format("Read data from this file
[%s] failed", filePath);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg);
}
});
}
}
@Override
- public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws FilePluginException {
- throw new UnsupportedOperationException("User must defined schema for
json file type");
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws FileConnectorException {
+ throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "User must defined schema for json file type");
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 52b2abdd2..032f91f5a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -27,8 +27,10 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
@@ -69,9 +71,10 @@ public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;
@Override
- public void read(String path, Collector<SeaTunnelRow> output) throws
Exception {
+ public void read(String path, Collector<SeaTunnelRow> output) throws
FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
- throw new Exception("Please check file type");
+ String errorMsg = String.format("This file [%s] is not a orc file,
please check the format of this file", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
}
Configuration configuration = getConfiguration();
Path filePath = new Path(path);
@@ -113,7 +116,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
}
@Override
- public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws FilePluginException {
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws FileConnectorException {
Configuration configuration = getConfiguration(hadoopConf);
OrcFile.ReaderOptions readerOptions =
OrcFile.readerOptions(configuration);
Path dstDir = new Path(path);
@@ -129,7 +132,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
seaTunnelRowTypeWithPartition = mergePartitionTypes(path,
seaTunnelRowType);
return getActualSeaTunnelRowTypeInfo();
} catch (IOException e) {
- throw new FilePluginException("Create OrcReader Fail", e);
+ String errorMsg = String.format("Create orc reader for this file
[%s] failed", path);
+ throw new
FileConnectorException(CommonErrorCode.READER_OPERATION_FAILED, errorMsg);
}
}
@@ -169,9 +173,9 @@ public class OrcReadStrategy extends AbstractReadStrategy {
}
in.close();
return checkResult;
- } catch (FilePluginException | IOException e) {
- String errorMsg = String.format("Check orc file [%s] error", path);
- throw new UnsupportedOperationException(errorMsg, e);
+ } catch (IOException e) {
+ String errorMsg = String.format("Check orc file [%s] failed",
path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg, e);
}
}
@@ -227,7 +231,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
String errorMsg = String.format("SeaTunnel array type
not supported this genericType [%s] yet", seaTunnelDataType);
- throw new UnsupportedOperationException(errorMsg);
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
case MAP:
TypeDescription keyType = typeDescription.getChildren().get(0);
@@ -242,7 +246,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
// do nothing
// never get in there
String errorMsg = String.format("SeaTunnel file connector not
supported this orc type [%s] yet", typeDescription.getCategory());
- throw new UnsupportedOperationException(errorMsg);
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
}
@@ -281,7 +285,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
columnObj = readUnionVal(colVec, colType, rowNum);
break;
default:
- throw new UnsupportedOperationException("ReadColumn:
unsupported ORC file column type: " + colVec.type.name());
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "ReadColumn: unsupported ORC file column type: " +
colVec.type.name());
}
}
return columnObj;
@@ -378,7 +383,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
objMap.put(keyList[i], valueList[i]);
}
} else {
- throw new UnsupportedOperationException("readMapVal: unsupported
key or value types");
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "readMapVal: unsupported key or value types");
}
return objMap;
}
@@ -446,7 +452,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
);
break;
default:
- throw new UnsupportedOperationException(mapVector.type.name()
+ " is not supported for MapColumnVectors");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ mapVector.type.name() + " is not supported for
MapColumnVectors");
}
return mapList;
}
@@ -463,10 +470,12 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
Object unionValue = readColumn(fieldVector, fieldType, rowNum);
columnValuePair = Pair.of(fieldType, unionValue);
} else {
- throw new UnsupportedOperationException("readUnionVal: union
tag value out of range for union column vectors");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "readUnionVal: union tag value out of range for union
column vectors");
}
} else {
- throw new UnsupportedOperationException("readUnionVal: union tag
value out of range for union types");
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "readUnionVal: union tag value out of range for union
types");
}
return columnValuePair;
}
@@ -494,7 +503,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
listValues = readTimestampListValues(listVector,
childType, rowNum);
break;
default:
- throw new
UnsupportedOperationException(listVector.type.name() + " is not supported for
ListColumnVectors");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ listVector.type.name() + " is not supported for
ListColumnVectors");
}
}
return listValues;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 212f79821..beddcec88 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -28,8 +28,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
@@ -75,9 +77,10 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
@Override
- public void read(String path, Collector<SeaTunnelRow> output) throws
Exception {
+ public void read(String path, Collector<SeaTunnelRow> output) throws
FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
- throw new Exception("please check file type");
+ String errorMsg = String.format("This file [%s] is not a parquet
file, please check the format of this file", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
}
Path filePath = new Path(path);
Map<String, String> partitionsMap = parsePartitionsByPath(path);
@@ -141,7 +144,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
return origArray.toArray(TYPE_ARRAY_DOUBLE);
default:
String errorMsg = String.format("SeaTunnel array type
not support this type [%s] now", fieldType.getSqlType());
- throw new UnsupportedOperationException(errorMsg);
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
case MAP:
HashMap<Object, Object> dataMap = new HashMap<>();
@@ -193,12 +196,12 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
default:
// do nothing
// never got in there
- throw new UnsupportedOperationException("SeaTunnel not support
this data type now");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel not
support this data type now");
}
}
@Override
- public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws FilePluginException {
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws FileConnectorException {
Path filePath = new Path(path);
ParquetMetadata metadata;
try {
@@ -207,7 +210,8 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
metadata = reader.getFooter();
reader.close();
} catch (IOException e) {
- throw new FilePluginException("Create parquet reader failed", e);
+ String errorMsg = String.format("Create parquet reader for this
file [%s] failed", path);
+ throw new
FileConnectorException(CommonErrorCode.READER_OPERATION_FAILED, errorMsg, e);
}
FileMetaData fileMetaData = metadata.getFileMetaData();
MessageType schema = fileMetaData.getSchema();
@@ -242,7 +246,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
return LocalTimeType.LOCAL_DATE_TYPE;
default:
String errorMsg = String.format("Not support this
type [%s]", type);
- throw new UnsupportedOperationException(errorMsg);
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
case INT64:
if (type.asPrimitiveType().getOriginalType() ==
OriginalType.TIMESTAMP_MILLIS) {
@@ -276,7 +280,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
return new DecimalType(precision, scale);
default:
String errorMsg = String.format("Not support this type
[%s]", type);
- throw new UnsupportedOperationException(errorMsg);
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
} else {
LogicalTypeAnnotation logicalTypeAnnotation =
type.asGroupType().getLogicalTypeAnnotation();
@@ -326,10 +330,11 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
String errorMsg = String.format("SeaTunnel
array type not supported this genericType [%s] yet", fieldType);
- throw new
UnsupportedOperationException(errorMsg);
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
default:
- throw new UnsupportedOperationException("SeaTunnel
file connector not support this nest type");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "SeaTunnel file connector not support this
nest type");
}
}
}
@@ -350,9 +355,9 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
checkResult = Arrays.equals(magic, PARQUET_MAGIC);
in.close();
return checkResult;
- } catch (FilePluginException | IOException e) {
- String errorMsg = String.format("Check parquet file [%s] error",
path);
- throw new RuntimeException(errorMsg, e);
+ } catch (IOException e) {
+ String errorMsg = String.format("Check parquet file [%s] failed",
path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 3d1063c51..d8811a40c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -36,9 +36,9 @@ public interface ReadStrategy extends Serializable {
Configuration getConfiguration(HadoopConf conf);
- void read(String path, Collector<SeaTunnelRow> output) throws Exception;
+ void read(String path, Collector<SeaTunnelRow> output) throws IOException,
FileConnectorException;
- SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String
path) throws FilePluginException;
+ SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String
path) throws FileConnectorException;
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
index f12f2c2dc..9e9a47075 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import lombok.extern.slf4j.Slf4j;
@@ -32,7 +34,7 @@ public class ReadStrategyFactory {
return fileFormat.getReadStrategy();
} catch (IllegalArgumentException e) {
String errorMsg = String.format("File source connector not support
this file type [%s], please check your config", fileType);
- throw new RuntimeException(errorMsg);
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
errorMsg);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index e77273a7f..8df21c9d4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -28,7 +28,8 @@ import
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.hadoop.conf.Configuration;
@@ -49,7 +50,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
@Override
- public void read(String path, Collector<SeaTunnelRow> output) throws
IOException, FilePluginException {
+ public void read(String path, Collector<SeaTunnelRow> output) throws
FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path(path);
@@ -66,8 +67,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
}
output.collect(seaTunnelRow);
} catch (IOException e) {
- String errorMsg = String.format("Deserialize this data
[%s] error, please check the origin data", line);
- throw new RuntimeException(errorMsg);
+ String errorMsg = String.format("Deserialize this data
[%s] failed, please check the origin data", line);
+ throw new
FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
errorMsg, e);
}
});
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
index 2f0446374..881b7c6c7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
@@ -18,11 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -44,7 +46,9 @@ public class FtpFileSink extends BaseFileSink {
FtpConfig.FTP_HOST.key(), FtpConfig.FTP_PORT.key(),
FtpConfig.FTP_USERNAME.key(), FtpConfig.FTP_PASSWORD.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = FtpConf.buildWithConfig(pluginConfig);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index 681cc6fb2..c17e13ba8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -18,15 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,11 +55,13 @@ public class FtpFileSource extends BaseFileSource {
FtpConfig.FTP_HOST.key(), FtpConfig.FTP_PORT.key(),
FtpConfig.FTP_USERNAME.key(), FtpConfig.FTP_PASSWORD.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
- }
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg())); }
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_TYPE.key()).toUpperCase());
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Ftp file source connector only support read [text, csv, json] files");
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "Ftp file source connector only support read [text, csv,
json] files");
}
readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
@@ -65,7 +70,8 @@ public class FtpFileSource extends BaseFileSource {
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
+ String errorMsg = String.format("Get file list from this path [%s]
failed", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg,
e);
}
// support user-defined schema
// only json type support user-defined schema now
@@ -83,16 +89,19 @@ public class FtpFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
- throw new UnsupportedOperationException("SeaTunnel does
not support user-defined schema for [parquet, orc] files");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
default:
// never got in there
- throw new UnsupportedOperationException("SeaTunnel does
not supported this file format");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
}
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "Read file schema error.", e);
+ } catch (FileConnectorException e) {
+ String errorMsg = String.format("Get table schema from file
[%s] failed", filePaths.get(0));
+ throw new
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 836610c36..925fb2539 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -18,15 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,7 +55,9 @@ public class LocalFileSource extends BaseFileSource {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
LocalSourceConfig.FILE_PATH.key(),
LocalSourceConfig.FILE_TYPE.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
@@ -61,7 +66,8 @@ public class LocalFileSource extends BaseFileSource {
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
+ String errorMsg = String.format("Get file list from this path [%s]
failed", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg,
e);
}
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()).toUpperCase());
@@ -80,16 +86,19 @@ public class LocalFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
- throw new UnsupportedOperationException("SeaTunnel does
not support user-defined schema for [parquet, orc] files");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
default:
// never got in there
- throw new UnsupportedOperationException("SeaTunnel does
not supported this file format");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
}
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "Read file schema error.", e);
+ } catch (FileConnectorException e) {
+ String errorMsg = String.format("Get table schema from file
[%s] failed", filePaths.get(0));
+ throw new
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
index dc7dc1bdc..08cf65757 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
@@ -18,11 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -46,7 +48,9 @@ public class OssFileSink extends BaseFileSink {
OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
}
hadoopConf = OssConf.buildWithConfig(pluginConfig);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index d8a98cd0c..452b2c065 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -18,15 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.source;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,7 +55,9 @@ public class OssFileSource extends BaseFileSource {
OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
@@ -61,7 +66,8 @@ public class OssFileSource extends BaseFileSource {
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
+ String errorMsg = String.format("Get file list from this path [%s]
failed", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg,
e);
}
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
@@ -80,16 +86,19 @@ public class OssFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
- throw new UnsupportedOperationException("SeaTunnel does
not support user-defined schema for [parquet, orc] files");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
default:
// never got in there
- throw new UnsupportedOperationException("SeaTunnel does
not supported this file format");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
}
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "Read file schema error.", e);
+ } catch (FileConnectorException e) {
+ String errorMsg = String.format("Get table schema from file
[%s] failed", filePaths.get(0));
+ throw new
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
index e38e264a0..5f7622575 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
@@ -18,11 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.s3.sink;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -45,7 +47,9 @@ public class S3FileSink extends BaseFileSink {
S3Config.FILE_PATH.key(), S3Config.S3_BUCKET.key(),
S3Config.S3_ACCESS_KEY.key(), S3Config.S3_SECRET_KEY.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
}
hadoopConf = S3Conf.buildWithConfig(pluginConfig);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index ca1792e73..46995347c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -18,15 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.s3.source;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -51,7 +54,9 @@ public class S3FileSource extends BaseFileSource {
S3Config.FILE_PATH.key(), S3Config.FILE_TYPE.key(),
S3Config.S3_BUCKET.key(),
S3Config.S3_ACCESS_KEY.key(), S3Config.S3_SECRET_KEY.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
@@ -60,7 +65,8 @@ public class S3FileSource extends BaseFileSource {
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
+ String errorMsg = String.format("Get file list from this path [%s]
failed", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg,
e);
}
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE.key()).toUpperCase());
@@ -79,16 +85,19 @@ public class S3FileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
- throw new UnsupportedOperationException("SeaTunnel does
not support user-defined schema for [parquet, orc] files");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
default:
// never got in there
- throw new UnsupportedOperationException("SeaTunnel does
not supported this file format");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
}
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "Read file schema error.", e);
+ } catch (FileConnectorException e) {
+ String errorMsg = String.format("Get table schema from file
[%s] failed", filePaths.get(0));
+ throw new
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
index c6833bc0b..4fa76054a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
@@ -18,11 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -44,7 +46,9 @@ public class SftpFileSink extends BaseFileSink {
SftpConfig.SFTP_HOST.key(), SftpConfig.SFTP_PORT.key(),
SftpConfig.SFTP_USERNAME.key(),
SftpConfig.SFTP_PASSWORD.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index d03028184..2b5c3cacb 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -18,15 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.sftp.source;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,11 +55,14 @@ public class SftpFileSource extends BaseFileSource {
SftpConfig.SFTP_HOST.key(), SftpConfig.SFTP_PORT.key(),
SftpConfig.SFTP_USERNAME.key(),
SftpConfig.SFTP_PASSWORD.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(SftpConfig.FILE_TYPE.key()).toUpperCase());
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Sftp file source connector only support read [text, csv, json] files");
+ throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "Sftp file source connector only support read [text, csv,
json] files");
}
readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
@@ -65,7 +71,8 @@ public class SftpFileSource extends BaseFileSource {
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
+ String errorMsg = String.format("Get file list from this path [%s]
failed", path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg,
e);
}
// support user-defined schema
// only json csv text type support user-defined schema now
@@ -83,16 +90,19 @@ public class SftpFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
- throw new UnsupportedOperationException("SeaTunnel does
not support user-defined schema for [parquet, orc] files");
+ throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
default:
// never got in there
- throw new UnsupportedOperationException("SeaTunnel does
not supported this file format");
+ throw new
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
}
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "Read file schema error.", e);
+ } catch (FileConnectorException e) {
+ String errorMsg = String.format("Get table schema from file
[%s] failed", filePaths.get(0));
+ throw new
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}