This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ac4e880fb [Feature][Connector-V2][File] Optimize filesystem utils 
(#3749)
ac4e880fb is described below

commit ac4e880fb58ece7e0920c962c11f4d768d6519b1
Author: Tyrantlucifer <[email protected]>
AuthorDate: Sat Dec 17 20:31:21 2022 +0800

    [Feature][Connector-V2][File] Optimize filesystem utils (#3749)
---
 ...TextFileConfig.java => BaseFileSinkConfig.java} |  6 +--
 .../seatunnel/file/config/FileFormat.java          | 26 ++++++------
 .../seatunnel/file/sink/BaseFileSink.java          | 14 ++++---
 .../seatunnel/file/sink/BaseFileSinkWriter.java    | 12 ++++--
 .../sink/commit/FileSinkAggregatedCommitter.java   | 19 ++++-----
 .../file/sink/commit/FileSinkCommitter.java        | 18 +++++---
 ...TextFileSinkConfig.java => FileSinkConfig.java} |  6 +--
 .../seatunnel/file/sink/util/FileSystemUtils.java  | 37 ++++++++++-------
 .../file/sink/writer/AbstractWriteStrategy.java    | 48 +++++++++++++---------
 .../file/sink/writer/JsonWriteStrategy.java        |  7 ++--
 .../file/sink/writer/OrcWriteStrategy.java         |  6 +--
 .../file/sink/writer/ParquetWriteStrategy.java     |  6 +--
 .../file/sink/writer/TextWriteStrategy.java        |  7 ++--
 .../seatunnel/file/sink/writer/WriteStrategy.java  | 25 ++++++++++-
 .../file/sink/writer/WriteStrategyFactory.java     | 10 ++---
 .../hive/commit/HiveSinkAggregatedCommitter.java   |  7 ++--
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  2 +-
 .../commit/S3RedshiftSinkAggregatedCommitter.java  |  9 ++--
 .../seatunnel/redshift/sink/S3RedshiftSink.java    |  2 +-
 19 files changed, 159 insertions(+), 108 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
similarity index 96%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 6171a845d..2befb3ae6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -35,7 +35,7 @@ import java.io.Serializable;
 import java.util.Locale;
 
 @Data
-public class BaseTextFileConfig implements DelimiterConfig, CompressConfig, 
Serializable {
+public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, 
Serializable {
     private static final long serialVersionUID = 1L;
     protected String compressCodec;
     protected String fieldDelimiter = 
BaseSinkConfig.FIELD_DELIMITER.defaultValue();
@@ -48,7 +48,7 @@ public class BaseTextFileConfig implements DelimiterConfig, 
CompressConfig, Seri
     protected DateTimeUtils.Formatter datetimeFormat = 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
     protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
 
-    public BaseTextFileConfig(@NonNull Config config) {
+    public BaseFileSinkConfig(@NonNull Config config) {
         if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
             throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
                     "Compress not supported by SeaTunnel file connector now");
@@ -94,5 +94,5 @@ public class BaseTextFileConfig implements DelimiterConfig, 
CompressConfig, Seri
         }
     }
 
-    public BaseTextFileConfig() {}
+    public BaseFileSinkConfig() {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 4cddc2f4c..0cea4b15e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
@@ -34,9 +34,9 @@ import java.io.Serializable;
 public enum FileFormat implements Serializable {
     CSV("csv") {
         @Override
-        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
-            textFileSinkConfig.setFieldDelimiter(",");
-            return new TextWriteStrategy(textFileSinkConfig);
+        public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+            fileSinkConfig.setFieldDelimiter(",");
+            return new TextWriteStrategy(fileSinkConfig);
         }
 
         @Override
@@ -46,8 +46,8 @@ public enum FileFormat implements Serializable {
     },
     TEXT("txt") {
         @Override
-        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
-            return new TextWriteStrategy(textFileSinkConfig);
+        public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+            return new TextWriteStrategy(fileSinkConfig);
         }
 
         @Override
@@ -57,8 +57,8 @@ public enum FileFormat implements Serializable {
     },
     PARQUET("parquet") {
         @Override
-        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
-            return new ParquetWriteStrategy(textFileSinkConfig);
+        public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+            return new ParquetWriteStrategy(fileSinkConfig);
         }
 
         @Override
@@ -68,8 +68,8 @@ public enum FileFormat implements Serializable {
     },
     ORC("orc") {
         @Override
-        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
-            return new OrcWriteStrategy(textFileSinkConfig);
+        public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+            return new OrcWriteStrategy(fileSinkConfig);
         }
 
         @Override
@@ -79,8 +79,8 @@ public enum FileFormat implements Serializable {
     },
     JSON("json") {
         @Override
-        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
-            return new JsonWriteStrategy(textFileSinkConfig);
+        public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+            return new JsonWriteStrategy(fileSinkConfig);
         }
 
         @Override
@@ -103,7 +103,7 @@ public enum FileFormat implements Serializable {
         return null;
     }
 
-    public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
+    public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
         return null;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 6bbb2598b..647fe7214 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -31,8 +31,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
 
@@ -46,7 +47,8 @@ public abstract class BaseFileSink implements 
SeaTunnelSink<SeaTunnelRow, FileSi
     protected SeaTunnelRowType seaTunnelRowType;
     protected Config pluginConfig;
     protected HadoopConf hadoopConf;
-    protected TextFileSinkConfig textFileSinkConfig;
+    protected FileSystemUtils fileSystemUtils;
+    protected FileSinkConfig fileSinkConfig;
     protected WriteStrategy writeStrategy;
     protected JobContext jobContext;
     protected String jobId;
@@ -60,9 +62,11 @@ public abstract class BaseFileSink implements 
SeaTunnelSink<SeaTunnelRow, FileSi
     @Override
     public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         this.seaTunnelRowType = seaTunnelRowType;
-        this.textFileSinkConfig = new TextFileSinkConfig(pluginConfig, 
seaTunnelRowType);
-        this.writeStrategy = 
WriteStrategyFactory.of(textFileSinkConfig.getFileFormat(), textFileSinkConfig);
+        this.fileSinkConfig = new FileSinkConfig(pluginConfig, 
seaTunnelRowType);
+        this.writeStrategy = 
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
+        this.fileSystemUtils = new FileSystemUtils(hadoopConf);
         this.writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        this.writeStrategy.setFileSystemUtils(fileSystemUtils);
     }
 
     @Override
@@ -77,7 +81,7 @@ public abstract class BaseFileSink implements 
SeaTunnelSink<SeaTunnelRow, FileSi
 
     @Override
     public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
-        return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
+        return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index d5e499f0e..d0747cb1b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -42,10 +42,14 @@ import java.util.stream.Collectors;
 
 public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
FileCommitInfo, FileSinkState> {
     private final WriteStrategy writeStrategy;
+    private final FileSystemUtils fileSystemUtils;
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf 
hadoopConf, SinkWriter.Context context, String jobId, List<FileSinkState> 
fileSinkStates) {
+    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf 
hadoopConf,
+                              SinkWriter.Context context, String jobId,
+                              List<FileSinkState> fileSinkStates) {
         this.writeStrategy = writeStrategy;
+        this.fileSystemUtils = writeStrategy.getFileSystemUtils();
         int subTaskIndex = context.getIndexOfSubtask();
         String uuidPrefix;
         if (!fileSinkStates.isEmpty()) {
@@ -58,7 +62,7 @@ public class BaseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, FileCommitIn
         if (!fileSinkStates.isEmpty()) {
             try {
                 List<String> transactions = findTransactionList(jobId, 
uuidPrefix);
-                FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(hadoopConf);
+                FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(fileSystemUtils);
                 HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
                 fileSinkStates.forEach(fileSinkState ->
                     fileStatesMap.put(fileSinkState.getTransactionId(), 
fileSinkState));
@@ -87,7 +91,9 @@ public class BaseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, FileCommitIn
     }
 
     private List<String> findTransactionList(String jobId, String uuidPrefix) 
throws IOException {
-        return 
FileSystemUtils.dirList(AbstractWriteStrategy.getTransactionDirPrefix(writeStrategy.getFileSinkConfig().getTmpPath(),
 jobId, uuidPrefix))
+        return 
fileSystemUtils.dirList(AbstractWriteStrategy.getTransactionDirPrefix(writeStrategy
+                                .getFileSinkConfig().getTmpPath(),
+                        jobId, uuidPrefix))
             .stream().map(Path::getName).collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 04c79a835..da51f26e1 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
 import lombok.extern.slf4j.Slf4j;
@@ -31,12 +30,10 @@ import java.util.Map;
 
 @Slf4j
 public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
-    protected final HadoopConf hadoopConf;
+    protected final FileSystemUtils fileSystemUtils;
 
-    public FileSinkAggregatedCommitter(HadoopConf hadoopConf) {
-        this.hadoopConf = hadoopConf;
-        FileSystemUtils.CONF = FileSystemUtils.getConfiguration(hadoopConf);
-        log.info("Hadoop configuration initial done, [{}]", hadoopConf);
+    public FileSinkAggregatedCommitter(FileSystemUtils fileSystemUtils) {
+        this.fileSystemUtils = fileSystemUtils;
     }
 
     @Override
@@ -47,10 +44,10 @@ public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<File
                 for (Map.Entry<String, Map<String, String>> entry : 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
                     for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
                         // first rename temp file
-                        FileSystemUtils.renameFile(mvFileEntry.getKey(), 
mvFileEntry.getValue(), true);
+                        fileSystemUtils.renameFile(mvFileEntry.getKey(), 
mvFileEntry.getValue(), true);
                     }
                     // second delete transaction directory
-                    FileSystemUtils.deleteFile(entry.getKey());
+                    fileSystemUtils.deleteFile(entry.getKey());
                 }
             } catch (Exception e) {
                 log.error("commit aggregatedCommitInfo error, 
aggregatedCommitInfo = {} ", aggregatedCommitInfo, e);
@@ -100,12 +97,12 @@ public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<File
                 for (Map.Entry<String, Map<String, String>> entry : 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
                     // rollback the file
                     for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
-                        if (FileSystemUtils.fileExist(mvFileEntry.getValue()) 
&& !FileSystemUtils.fileExist(mvFileEntry.getKey())) {
-                            FileSystemUtils.renameFile(mvFileEntry.getValue(), 
mvFileEntry.getKey(), true);
+                        if (fileSystemUtils.fileExist(mvFileEntry.getValue()) 
&& !fileSystemUtils.fileExist(mvFileEntry.getKey())) {
+                            fileSystemUtils.renameFile(mvFileEntry.getValue(), 
mvFileEntry.getKey(), true);
                         }
                     }
                     // delete the transaction dir
-                    FileSystemUtils.deleteFile(entry.getKey());
+                    fileSystemUtils.deleteFile(entry.getKey());
                 }
             } catch (Exception e) {
                 log.error("abort aggregatedCommitInfo error ", e);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
index 8c5d50e57..7d96c11be 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
@@ -25,8 +25,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Deprecated interface since 2.3.0-beta, now used {@link 
FileSinkAggregatedCommitter}
+ */
 @Deprecated
 public class FileSinkCommitter implements SinkCommitter<FileCommitInfo> {
+    private final FileSystemUtils fileSystemUtils;
+
+    public FileSinkCommitter(FileSystemUtils fileSystemUtils) {
+        this.fileSystemUtils = fileSystemUtils;
+    }
 
     @Override
     public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos) 
throws IOException {
@@ -35,12 +43,12 @@ public class FileSinkCommitter implements 
SinkCommitter<FileCommitInfo> {
             Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
             needMoveFiles.forEach((k, v) -> {
                 try {
-                    FileSystemUtils.renameFile(k, v, true);
+                    fileSystemUtils.renameFile(k, v, true);
                 } catch (IOException e) {
                     failedCommitInfos.add(commitInfo);
                 }
             });
-            FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+            fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
         }
         return failedCommitInfos;
     }
@@ -56,11 +64,11 @@ public class FileSinkCommitter implements 
SinkCommitter<FileCommitInfo> {
         for (FileCommitInfo commitInfo : commitInfos) {
             Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
             for (Map.Entry<String, String> entry : needMoveFiles.entrySet()) {
-                if (FileSystemUtils.fileExist(entry.getValue()) && 
!FileSystemUtils.fileExist(entry.getKey())) {
-                    FileSystemUtils.renameFile(entry.getValue(), 
entry.getKey(), true);
+                if (fileSystemUtils.fileExist(entry.getValue()) && 
!fileSystemUtils.fileExist(entry.getKey())) {
+                    fileSystemUtils.renameFile(entry.getValue(), 
entry.getKey(), true);
                 }
             }
-            FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+            fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
similarity index 97%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
index 968df88df..3f2d79376 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
@@ -21,8 +21,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseTextFileConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
@@ -42,7 +42,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 @Data
-public class TextFileSinkConfig extends BaseTextFileConfig implements 
PartitionConfig {
+public class FileSinkConfig extends BaseFileSinkConfig implements 
PartitionConfig {
 
     private List<String> sinkColumnList;
 
@@ -64,7 +64,7 @@ public class TextFileSinkConfig extends BaseTextFileConfig 
implements PartitionC
 
     private List<Integer> partitionFieldsIndexInRow;
 
-    public TextFileSinkConfig(@NonNull Config config, @NonNull 
SeaTunnelRowType seaTunnelRowTypeInfo) {
+    public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo) {
         super(config);
         
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index a3e3aa1c3..2ec9f986a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -31,18 +31,24 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
 @Slf4j
-public class FileSystemUtils {
+public class FileSystemUtils implements Serializable {
+    private static final int WRITE_BUFFER_SIZE = 2048;
 
-    public static final int WRITE_BUFFER_SIZE = 2048;
+    private final HadoopConf hadoopConf;
 
-    public static Configuration CONF;
+    private transient Configuration configuration;
 
-    public static Configuration getConfiguration(HadoopConf hadoopConf) {
+    public FileSystemUtils(HadoopConf hadoopConf) {
+        this.hadoopConf = hadoopConf;
+    }
+
+    public Configuration getConfiguration(HadoopConf hadoopConf) {
         Configuration configuration = new Configuration();
         configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
         configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), 
hadoopConf.getFsHdfsImpl());
@@ -50,19 +56,22 @@ public class FileSystemUtils {
         return configuration;
     }
 
-    public static FileSystem getFileSystem(@NonNull String path) throws 
IOException {
-        FileSystem fileSystem = 
FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), CONF);
+    public FileSystem getFileSystem(@NonNull String path) throws IOException {
+        if (configuration == null) {
+            configuration = getConfiguration(hadoopConf);
+        }
+        FileSystem fileSystem = 
FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), configuration);
         fileSystem.setWriteChecksum(false);
         return fileSystem;
     }
 
-    public static FSDataOutputStream getOutputStream(@NonNull String 
outFilePath) throws IOException {
+    public FSDataOutputStream getOutputStream(@NonNull String outFilePath) 
throws IOException {
         FileSystem fileSystem = getFileSystem(outFilePath);
         Path path = new Path(outFilePath);
         return fileSystem.create(path, true, WRITE_BUFFER_SIZE);
     }
 
-    public static void createFile(@NonNull String filePath) throws IOException 
{
+    public void createFile(@NonNull String filePath) throws IOException {
         FileSystem fileSystem = getFileSystem(filePath);
         Path path = new Path(filePath);
         if (!fileSystem.createNewFile(path)) {
@@ -71,7 +80,7 @@ public class FileSystemUtils {
         }
     }
 
-    public static void deleteFile(@NonNull String file) throws IOException {
+    public void deleteFile(@NonNull String file) throws IOException {
         FileSystem fileSystem = getFileSystem(file);
         Path path = new Path(file);
         if (fileSystem.exists(path)) {
@@ -90,7 +99,7 @@ public class FileSystemUtils {
      * @param rmWhenExist if this is true, we will delete the target file when 
it already exists
      * @throws IOException throw IOException
      */
-    public static void renameFile(@NonNull String oldName, @NonNull String 
newName, boolean rmWhenExist)
+    public void renameFile(@NonNull String oldName, @NonNull String newName, 
boolean rmWhenExist)
         throws IOException {
         FileSystem fileSystem = getFileSystem(newName);
         log.info("begin rename file oldName :[" + oldName + "] to newName :[" 
+ newName + "]");
@@ -121,7 +130,7 @@ public class FileSystemUtils {
         }
     }
 
-    public static void createDir(@NonNull String filePath) throws IOException {
+    public void createDir(@NonNull String filePath) throws IOException {
         FileSystem fileSystem = getFileSystem(filePath);
         Path dfs = new Path(filePath);
         if (!fileSystem.mkdirs(dfs)) {
@@ -130,7 +139,7 @@ public class FileSystemUtils {
         }
     }
 
-    public static boolean fileExist(@NonNull String filePath) throws 
IOException {
+    public boolean fileExist(@NonNull String filePath) throws IOException {
         FileSystem fileSystem = getFileSystem(filePath);
         Path fileName = new Path(filePath);
         return fileSystem.exists(fileName);
@@ -139,7 +148,7 @@ public class FileSystemUtils {
     /**
      * get the dir in filePath
      */
-    public static List<Path> dirList(@NonNull String filePath) throws 
IOException {
+    public List<Path> dirList(@NonNull String filePath) throws IOException {
         FileSystem fileSystem = getFileSystem(filePath);
         List<Path> pathList = new ArrayList<>();
         if (!fileExist(filePath)) {
@@ -147,7 +156,7 @@ public class FileSystemUtils {
         }
         Path fileName = new Path(filePath);
         FileStatus[] status = fileSystem.listStatus(fileName);
-        if (status != null && status.length > 0) {
+        if (status != null) {
             for (FileStatus fileStatus : status) {
                 if (fileStatus.isDirectory()) {
                     pathList.add(fileStatus.getPath());
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index e8e92ce9a..264af8af3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -32,12 +32,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
 import com.google.common.collect.Lists;
-import lombok.Getter;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -61,12 +60,12 @@ import java.util.stream.Collectors;
 
 public abstract class AbstractWriteStrategy implements WriteStrategy {
     protected final Logger log = LoggerFactory.getLogger(this.getClass());
-    @Getter
-    protected final TextFileSinkConfig textFileSinkConfig;
+    protected final FileSinkConfig fileSinkConfig;
     protected final List<Integer> sinkColumnsIndexInRow;
     protected String jobId;
     protected int subTaskIndex;
     protected HadoopConf hadoopConf;
+    protected FileSystemUtils fileSystemUtils;
     protected String transactionId;
     /**
      * The uuid prefix to make sure same job different file sink will not 
conflict.
@@ -84,10 +83,10 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     protected int batchSize;
     protected int currentBatchSize = 0;
 
-    public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
-        this.textFileSinkConfig = textFileSinkConfig;
-        this.sinkColumnsIndexInRow = 
textFileSinkConfig.getSinkColumnsIndexInRow();
-        this.batchSize = textFileSinkConfig.getBatchSize();
+    public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
+        this.fileSinkConfig = fileSinkConfig;
+        this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow();
+        this.batchSize = fileSinkConfig.getBatchSize();
     }
 
     /**
@@ -101,7 +100,6 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         this.jobId = jobId;
         this.subTaskIndex = subTaskIndex;
         this.uuidPrefix = uuidPrefix;
-        FileSystemUtils.CONF = getConfiguration(hadoopConf);
     }
 
     @Override
@@ -151,14 +149,14 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      */
     @Override
     public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow) {
-        List<Integer> partitionFieldsIndexInRow = 
textFileSinkConfig.getPartitionFieldsIndexInRow();
+        List<Integer> partitionFieldsIndexInRow = 
fileSinkConfig.getPartitionFieldsIndexInRow();
         Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
         if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
             partitionDirAndValuesMap.put(BaseSinkConfig.NON_PARTITION, null);
             return partitionDirAndValuesMap;
         }
-        List<String> partitionFieldList = 
textFileSinkConfig.getPartitionFieldList();
-        String partitionDirExpression = 
textFileSinkConfig.getPartitionDirExpression();
+        List<String> partitionFieldList = 
fileSinkConfig.getPartitionFieldList();
+        String partitionDirExpression = 
fileSinkConfig.getPartitionDirExpression();
         String[] keys = new String[partitionFieldList.size()];
         String[] values = new String[partitionFieldList.size()];
         for (int i = 0; i < partitionFieldList.size(); i++) {
@@ -197,12 +195,12 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      */
     @Override
     public String generateFileName(String transactionId) {
-        String fileNameExpression = textFileSinkConfig.getFileNameExpression();
-        FileFormat fileFormat = textFileSinkConfig.getFileFormat();
+        String fileNameExpression = fileSinkConfig.getFileNameExpression();
+        FileFormat fileFormat = fileSinkConfig.getFileFormat();
         if (StringUtils.isBlank(fileNameExpression)) {
             return transactionId + fileFormat.getSuffix();
         }
-        String timeFormat = textFileSinkConfig.getFileNameTimeFormat();
+        String timeFormat = fileSinkConfig.getFileNameTimeFormat();
         DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
         String formattedDate = df.format(ZonedDateTime.now());
         Map<String, String> valuesMap = new HashMap<>();
@@ -242,7 +240,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      */
     public void abortPrepare(String transactionId) {
         try {
-            FileSystemUtils.deleteFile(getTransactionDir(transactionId));
+            fileSystemUtils.deleteFile(getTransactionDir(transactionId));
         } catch (IOException e) {
             throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
                     "Abort transaction " + transactionId + " error, delete 
transaction directory failed", e);
@@ -292,7 +290,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      * @return transaction directory
      */
     private String getTransactionDir(@NonNull String transactionId) {
-        String transactionDirectoryPrefix = 
getTransactionDirPrefix(textFileSinkConfig.getTmpPath(), jobId, uuidPrefix);
+        String transactionDirectoryPrefix = 
getTransactionDirPrefix(fileSinkConfig.getTmpPath(), jobId, uuidPrefix);
         return String.join(File.separator, new 
String[]{transactionDirectoryPrefix, transactionId});
     }
 
@@ -321,7 +319,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
 
     public String getTargetLocation(@NonNull String seaTunnelFilePath) {
         String tmpPath = 
seaTunnelFilePath.replaceAll(Matcher.quoteReplacement(transactionDirectory),
-                Matcher.quoteReplacement(textFileSinkConfig.getPath()));
+                Matcher.quoteReplacement(fileSinkConfig.getPath()));
         return tmpPath.replaceAll(BaseSinkConfig.NON_PARTITION + 
Matcher.quoteReplacement(File.separator), "");
     }
 
@@ -331,7 +329,17 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     }
 
     @Override
-    public TextFileSinkConfig getFileSinkConfig() {
-        return textFileSinkConfig;
+    public FileSinkConfig getFileSinkConfig() {
+        return fileSinkConfig;
+    }
+
+    @Override
+    public FileSystemUtils getFileSystemUtils() {
+        return fileSystemUtils;
+    }
+
+    @Override
+    public void setFileSystemUtils(FileSystemUtils fileSystemUtils) {
+        this.fileSystemUtils = fileSystemUtils;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 95585606c..9ae10fd35 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -22,8 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
 import lombok.NonNull;
@@ -39,7 +38,7 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
     private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
     private final Map<String, Boolean> isFirstWrite;
 
-    public JsonWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+    public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) {
         super(textFileSinkConfig);
         this.beingWrittenOutputStream = new HashMap<>();
         this.isFirstWrite = new HashMap<>();
@@ -94,7 +93,7 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
         FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
-                fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+                fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
                 beingWrittenOutputStream.put(filePath, fsDataOutputStream);
                 isFirstWrite.put(filePath, true);
             } catch (IOException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index a2e164d3b..af5b774e9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 
 import lombok.NonNull;
 import org.apache.hadoop.fs.Path;
@@ -62,8 +62,8 @@ import java.util.Map;
 public class OrcWriteStrategy extends AbstractWriteStrategy {
     private final Map<String, Writer> beingWrittenWriter;
 
-    public OrcWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
-        super(textFileSinkConfig);
+    public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
+        super(fileSinkConfig);
         this.beingWrittenWriter = new HashMap<>();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 7a23f1b82..6c431605a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 
 import lombok.NonNull;
 import org.apache.avro.Conversions;
@@ -78,8 +78,8 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
         }
     }
 
-    public ParquetWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
-        super(textFileSinkConfig);
+    public ParquetWriteStrategy(FileSinkConfig fileSinkConfig) {
+        super(fileSinkConfig);
         this.beingWrittenWriter = new HashMap<>();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index d3aa838e2..7a6b6b329 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -25,8 +25,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import lombok.NonNull;
@@ -46,7 +45,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
     private final TimeUtils.Formatter timeFormat;
     private SerializationSchema serializationSchema;
 
-    public TextWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+    public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
         super(textFileSinkConfig);
         this.beingWrittenOutputStream = new HashMap<>();
         this.isFirstWrite = new HashMap<>();
@@ -112,7 +111,7 @@ public class TextWriteStrategy extends 
AbstractWriteStrategy {
         FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
-                fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+                fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
                 beingWrittenOutputStream.put(filePath, fsDataOutputStream);
                 isFirstWrite.put(filePath, true);
             } catch (IOException e) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 4d716bf58..2b39f38e6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -21,7 +21,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -76,7 +77,27 @@ public interface WriteStrategy extends Transaction, 
Serializable {
      */
     void finishAndCloseFile();
 
+    /**
+     * get current checkpoint id
+     * @return checkpoint id
+     */
     long getCheckpointId();
 
-    TextFileSinkConfig getFileSinkConfig();
+    /**
+     * get sink configuration
+     * @return sink configuration
+     */
+    FileSinkConfig getFileSinkConfig();
+
+    /**
+     * get file system utils
+     * @return file system utils
+     */
+    FileSystemUtils getFileSystemUtils();
+
+    /**
+     * set file system utils
+     * @param fileSystemUtils fileSystemUtils
+     */
+    void setFileSystemUtils(FileSystemUtils fileSystemUtils);
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
index 03388cf58..5c7e46fd5 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
@@ -20,7 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -29,17 +29,17 @@ public class WriteStrategyFactory {
 
     private WriteStrategyFactory() {}
 
-    public static WriteStrategy of(String fileType, TextFileSinkConfig 
textFileSinkConfig) {
+    public static WriteStrategy of(String fileType, FileSinkConfig 
fileSinkConfig) {
         try {
             FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
-            return fileFormat.getWriteStrategy(textFileSinkConfig);
+            return fileFormat.getWriteStrategy(fileSinkConfig);
         } catch (IllegalArgumentException e) {
             String errorMsg = String.format("File sink connector not support 
this file type [%s], please check your config", fileType);
             throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 
-    public static WriteStrategy of(FileFormat fileFormat, TextFileSinkConfig 
textFileSinkConfig) {
-        return fileFormat.getWriteStrategy(textFileSinkConfig);
+    public static WriteStrategy of(FileFormat fileFormat, FileSinkConfig 
fileSinkConfig) {
+        return fileFormat.getWriteStrategy(fileSinkConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 01f5a9d27..b6c22e5f3 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.commit;
 
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -38,8 +38,9 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
     private final String dbName;
     private final String tableName;
 
-    public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName, 
String tableName, HadoopConf hadoopConf) {
-        super(hadoopConf);
+    public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName,
+                                       String tableName, FileSystemUtils 
fileSystemUtils) {
+        super(fileSystemUtils);
         this.pluginConfig = pluginConfig;
         this.dbName = dbName;
         this.tableName = tableName;
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 0e5a60532..914c2b653 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -130,6 +130,6 @@ public class HiveSink extends BaseHdfsFileSink {
 
     @Override
     public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
-        return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig, 
dbName, tableName, hadoopConf));
+        return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig, 
dbName, tableName, fileSystemUtils));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index 6e8267451..0010c3a5b 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.redshift.commit;
 
 import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
@@ -44,8 +43,8 @@ public class S3RedshiftSinkAggregatedCommitter extends 
FileSinkAggregatedCommitt
 
     private Config pluginConfig;
 
-    public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config 
pluginConfig) {
-        super(hadoopConf);
+    public S3RedshiftSinkAggregatedCommitter(FileSystemUtils fileSystemUtils, 
Config pluginConfig) {
+        super(fileSystemUtils);
         this.pluginConfig = pluginConfig;
         this.executeSql = 
pluginConfig.getString(S3RedshiftConfig.EXECUTE_SQL.key());
     }
@@ -61,7 +60,7 @@ public class S3RedshiftSinkAggregatedCommitter extends 
FileSinkAggregatedCommitt
                         log.debug("execute redshift sql is:" + sql);
                         
RedshiftJdbcClient.getInstance(pluginConfig).execute(sql);
                         try {
-                            FileSystemUtils.deleteFile(tmpFileEntry.getKey());
+                            fileSystemUtils.deleteFile(tmpFileEntry.getKey());
                         } catch (IOException e) {
                             log.warn("delete tmp file error:" + 
tmpFileEntry.getKey());
                         }
@@ -86,7 +85,7 @@ public class S3RedshiftSinkAggregatedCommitter extends 
FileSinkAggregatedCommitt
             try {
                 for (Map.Entry<String, Map<String, String>> entry : 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
                     // delete the transaction dir
-                    FileSystemUtils.deleteFile(entry.getKey());
+                    fileSystemUtils.deleteFile(entry.getKey());
                 }
             } catch (Exception e) {
                 log.error("abort aggregatedCommitInfo error ", e);
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
index 06debc295..f2978f7ae 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
@@ -62,6 +62,6 @@ public class S3RedshiftSink extends BaseHdfsFileSink {
 
     @Override
     public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>> createAggregatedCommitter() {
-        return Optional.of(new S3RedshiftSinkAggregatedCommitter(hadoopConf, 
pluginConfig));
+        return Optional.of(new 
S3RedshiftSinkAggregatedCommitter(fileSystemUtils, pluginConfig));
     }
 }

Reply via email to