This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0b9c4acdb2 [Feature][connector-file-local]localfile supports splitting
and parallel reading of large files of csv, text, and json (#10142)
0b9c4acdb2 is described below
commit 0b9c4acdb2cfa76ef6e19c9cbfc9bbd81f890ad1
Author: 老王 <[email protected]>
AuthorDate: Mon Dec 22 21:20:55 2025 +0800
[Feature][connector-file-local]localfile supports splitting and parallel
reading of large files of csv, text, and json (#10142)
---
docs/en/connector-v2/source/LocalFile.md | 12 +-
docs/zh/connector-v2/source/LocalFile.md | 10 ++
.../seatunnel/file/config/FileBaseOptions.java | 13 ++
.../file/source/BaseMultipleTableFileSource.java | 13 +-
.../file/source/reader/AbstractReadStrategy.java | 48 ++++---
.../file/source/reader/CsvReadStrategy.java | 42 ++++--
.../file/source/reader/ExcelReadStrategy.java | 9 +-
.../file/source/reader/JsonReadStrategy.java | 20 ++-
.../reader/MultipleTableFileSourceReader.java | 2 +-
.../seatunnel/file/source/reader/ReadStrategy.java | 6 +
.../file/source/reader/TextReadStrategy.java | 30 ++--
.../file/source/reader/XmlReadStrategy.java | 13 +-
.../split/AccordingToSplitSizeSplitStrategy.java | 160 +++++++++++++++++++++
.../source/split/DefaultFileSplitStrategy.java} | 21 +--
.../file/source/split/FileSourceSplit.java | 18 ++-
.../file/source/split/FileSplitStrategy.java} | 20 +--
.../MultipleTableFileSourceSplitEnumerator.java | 9 +-
...MultipleTableFileSourceSplitEnumeratorTest.java | 2 +-
.../file/local/source/LocalFileSource.java | 28 +++-
.../file/local/source/LocalFileSourceFactory.java | 8 ++
...LocalFileAccordingToSplitSizeSplitStrategy.java | 59 ++++++++
.../file/local/SplitFileStrategyTest.java | 155 ++++++++++++++++++++
.../src/test/resources/test_split_csv_data.csv | 9 ++
.../src/test/resources/test_split_empty_data.csv | 0
.../test_split_special_row_delimiter_data.txt | 1 +
.../reader/MultipleTableHiveSourceReader.java | 2 +-
.../e2e/connector/file/local/LocalFileIT.java | 3 +
.../csv/local_csv_enable_split_to_assert.conf | 88 ++++++++++++
.../local_file_json_enable_split_to_assert.conf | 132 +++++++++++++++++
.../local_file_text_enable_split_to_assert.conf | 132 +++++++++++++++++
30 files changed, 974 insertions(+), 91 deletions(-)
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index c9338b3cbf..f0397c621d 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -64,7 +64,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| skip_header_row_number | long | no | 0
|
| schema | config | no | -
|
| sheet_name | string | no | -
|
-| excel_engine | string | no | POI
|
+| excel_engine | string | no | POI
|
| xml_row_tag | string | no | -
|
| xml_use_attr_format | boolean | no | -
|
| csv_use_header_line | boolean | no | false
|
@@ -80,6 +80,8 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| tables_configs | list | no | used to define a multiple
table task |
| file_filter_modified_start | string | no | -
|
| file_filter_modified_end | string | no | -
|
+| enable_file_split | boolean | no | false
|
+| file_split_size | long | no | 134217728
|
### path [string]
@@ -415,6 +417,14 @@ File modification time filter. The connector will filter
some files base on the
File modification time filter. The connector will filter some files base on
the last modification end time (not include end time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
+### enable_file_split [string]
+
+Turn on the file splitting function, the default is false。It can be selected
when the file type is csv, text, json and non-compressed format.
+
+### file_split_size [long]
+
+File split size, which can be filled in when the enable_file_split parameter
is true. The unit is the number of bytes. The default value is the number of
bytes of 128MB, which is 134217728.
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
diff --git a/docs/zh/connector-v2/source/LocalFile.md
b/docs/zh/connector-v2/source/LocalFile.md
index b2eb9b86f5..7f0432da4b 100644
--- a/docs/zh/connector-v2/source/LocalFile.md
+++ b/docs/zh/connector-v2/source/LocalFile.md
@@ -80,6 +80,8 @@ import ChangeLog from '../changelog/connector-file-local.md';
| tables_configs | list | 否 | 用于定义多表任务 |
| file_filter_modified_start | string | 否 | - |
| file_filter_modified_end | string | 否 | - |
+| enable_file_split | boolean | 否 | false |
+| file_split_size | long | 否 | 134217728 |
### path [string]
@@ -415,6 +417,14 @@ null_format 定义哪些字符串可以表示为 null。
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`。
+### enable_file_split [boolean]
+
+开启文件分割功能,默认为false。文件类型为csv、text、json、非压缩格式时可选择。
+
+### file_split_size [long]
+
+文件分割大小,enable_file_split参数为true时可以填写。单位是字节数。默认值为128MB的字节数,即134217728。
+
### 通用选项
数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
index 2f6728c6d4..b88b64b31d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
@@ -150,4 +150,17 @@ public class FileBaseOptions extends
ConnectorCommonOptions {
.enumType(ArchiveCompressFormat.class)
.defaultValue(ArchiveCompressFormat.NONE)
.withDescription("Archive compression codec");
+
+ public static final Option<Boolean> ENABLE_FILE_SPLIT =
+ Options.key("enable_file_split")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Turn on the file splitting function, the
default is false");
+
+ public static final Option<Long> FILE_SPLIT_SIZE =
+ Options.key("file_split_size")
+ .longType()
+ .defaultValue(128 * 1024 * 1024L)
+ .withDescription(
+ "File split size, which can be filled in when the
enable_file_split parameter is true. The unit is the number of bytes. The
default value is the number of bytes of 128MB, which is 128*1024*1024.");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
index 0ad3f3a62b..6626ccd2a9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
@@ -28,7 +28,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSourceSplitEnumerator;
import
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
@@ -41,10 +43,19 @@ public abstract class BaseMultipleTableFileSource
SupportColumnProjection {
private final BaseMultipleTableFileSourceConfig
baseMultipleTableFileSourceConfig;
+ private final FileSplitStrategy fileSplitStrategy;
public BaseMultipleTableFileSource(
BaseMultipleTableFileSourceConfig
baseMultipleTableFileSourceConfig) {
this.baseMultipleTableFileSourceConfig =
baseMultipleTableFileSourceConfig;
+ this.fileSplitStrategy = new DefaultFileSplitStrategy();
+ }
+
+ public BaseMultipleTableFileSource(
+ BaseMultipleTableFileSourceConfig
baseMultipleTableFileSourceConfig,
+ FileSplitStrategy fileSplitStrategy) {
+ this.baseMultipleTableFileSourceConfig =
baseMultipleTableFileSourceConfig;
+ this.fileSplitStrategy = fileSplitStrategy;
}
@Override
@@ -72,7 +83,7 @@ public abstract class BaseMultipleTableFileSource
public SourceSplitEnumerator<FileSourceSplit, FileSourceState>
createEnumerator(
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) {
return new MultipleTableFileSourceSplitEnumerator(
- enumeratorContext, baseMultipleTableFileSourceConfig);
+ enumeratorContext, baseMultipleTableFileSourceConfig,
fileSplitStrategy);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 3bcda9bd45..dc1641e5f6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -26,16 +26,19 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipParameters;
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FileStatus;
import lombok.extern.slf4j.Slf4j;
@@ -94,6 +97,8 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected Date fileModifiedEndDate;
protected String fileBasePath;
+ protected boolean enableSplitFile;
+
@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
@@ -242,6 +247,10 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
pluginConfig.getString(
FileBaseSourceOptions.FILE_FILTER_MODIFIED_END.key()));
}
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key())) {
+ enableSplitFile =
+
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key());
+ }
}
@Override
@@ -250,12 +259,13 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
protected void resolveArchiveCompressedInputStream(
- String path,
- String tableId,
+ FileSourceSplit split,
Collector<SeaTunnelRow> output,
Map<String, String> partitionsMap,
FileFormat fileFormat)
throws IOException {
+ String path = split.getFilePath();
+ String tableId = split.getTableId();
switch (archiveCompressFormat) {
case ZIP:
try (ZipInputStream zis =
@@ -264,8 +274,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory() &&
checkFileType(entry.getName(), fileFormat)) {
readProcess(
- path,
- tableId,
+ split,
output,
copyInputStream(zis),
partitionsMap,
@@ -282,8 +291,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
while ((entry = tarInput.getNextTarEntry()) != null) {
if (!entry.isDirectory() &&
checkFileType(entry.getName(), fileFormat)) {
readProcess(
- path,
- tableId,
+ split,
output,
copyInputStream(tarInput),
partitionsMap,
@@ -302,8 +310,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
while ((entry = tarIn.getNextTarEntry()) != null) {
if (!entry.isDirectory() &&
checkFileType(entry.getName(), fileFormat)) {
readProcess(
- path,
- tableId,
+ split,
output,
copyInputStream(tarIn),
partitionsMap,
@@ -331,13 +338,11 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
fileName = path;
}
}
- readProcess(
- path, tableId, output, copyInputStream(gzipIn),
partitionsMap, fileName);
+ readProcess(split, output, copyInputStream(gzipIn),
partitionsMap, fileName);
break;
case NONE:
readProcess(
- path,
- tableId,
+ split,
output,
hadoopFileSystemProxy.getInputStream(path),
partitionsMap,
@@ -348,8 +353,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
"The file does not support this archive compress type:
{}",
archiveCompressFormat);
readProcess(
- path,
- tableId,
+ split,
output,
hadoopFileSystemProxy.getInputStream(path),
partitionsMap,
@@ -358,8 +362,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
protected void readProcess(
- String path,
- String tableId,
+ FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
@@ -451,6 +454,19 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
return false;
}
+ protected static InputStream safeSlice(InputStream in, long start, long
length)
+ throws IOException {
+ long toSkip = start;
+ while (toSkip > 0) {
+ long skipped = in.skip(toSkip);
+ if (skipped <= 0) {
+ throw new SeaTunnelException("skipped error");
+ }
+ toSkip -= skipped;
+ }
+ return new BoundedInputStream(in, length);
+ }
+
@Override
public void close() throws IOException {
try {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index 3df935e9b2..034563f94d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -33,6 +33,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptio
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.format.csv.CsvDeserializationSchema;
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
@@ -75,13 +76,20 @@ public class CsvReadStrategy extends AbstractReadStrategy {
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
- resolveArchiveCompressedInputStream(path, tableId, output,
partitionsMap, FileFormat.CSV);
+ resolveArchiveCompressedInputStream(
+ new FileSourceSplit(tableId, path), output, partitionsMap,
FileFormat.CSV);
+ }
+
+ @Override
+ public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+ throws IOException, FileConnectorException {
+ Map<String, String> partitionsMap =
parsePartitionsByPath(split.getFilePath());
+ resolveArchiveCompressedInputStream(split, output, partitionsMap,
FileFormat.CSV);
}
@Override
public void readProcess(
- String path,
- String tableId,
+ FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
@@ -103,11 +111,18 @@ public class CsvReadStrategy extends AbstractReadStrategy
{
actualInputStream = inputStream;
break;
}
+ // rebuild inputStream
+ if (enableSplitFile && split.getLength() > -1) {
+ actualInputStream = safeSlice(inputStream, split.getStart(),
split.getLength());
+ }
Builder builder =
CSVFormat.EXCEL.builder().setIgnoreEmptyLines(true).setDelimiter(getDelimiter());
CSVFormat csvFormat = builder.build();
- if (firstLineAsHeader) {
- csvFormat = csvFormat.withFirstRecordAsHeader();
+ // if enableSplitFile is true,no need to skip
+ if (!enableSplitFile) {
+ if (firstLineAsHeader) {
+ csvFormat = csvFormat.withFirstRecordAsHeader();
+ }
}
try (BufferedReader reader =
new BufferedReader(new
InputStreamReader(actualInputStream, encoding));
@@ -119,12 +134,15 @@ public class CsvReadStrategy extends AbstractReadStrategy
{
reader.reset();
}
// skip lines
- for (int i = 0; i < skipHeaderNumber; i++) {
- if (reader.readLine() == null) {
- throw new IOException(
- String.format(
- "File [%s] has fewer lines than expected
to skip.",
- currentFileName));
+ // if enableSplitFile is true,no need to skip
+ if (!enableSplitFile) {
+ for (int i = 0; i < skipHeaderNumber; i++) {
+ if (reader.readLine() == null) {
+ throw new IOException(
+ String.format(
+ "File [%s] has fewer lines than
expected to skip.",
+ currentFileName));
+ }
}
}
// read lines
@@ -161,7 +179,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
seaTunnelRow.setField(index++, value);
}
}
- seaTunnelRow.setTableId(tableId);
+ seaTunnelRow.setTableId(split.getTableId());
output.collect(seaTunnelRow);
}
} catch (IOException e) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index 57d373c981..5444b16846 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -32,6 +32,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelCellUtils;
import
org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelReaderListener;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
@@ -77,19 +78,19 @@ public class ExcelReadStrategy extends AbstractReadStrategy
{
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow>
output) {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
- resolveArchiveCompressedInputStream(path, tableId, output,
partitionsMap, FileFormat.EXCEL);
+ resolveArchiveCompressedInputStream(
+ new FileSourceSplit(tableId, path), output, partitionsMap,
FileFormat.EXCEL);
}
@Override
protected void readProcess(
- String path,
- String tableId,
+ FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
String currentFileName)
throws IOException {
-
+ String tableId = split.getTableId();
if (skipHeaderNumber > Integer.MAX_VALUE || skipHeaderNumber <
Integer.MIN_VALUE) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 49612601a4..98b8de3ca2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import io.airlift.compress.lzo.LzopCodec;
@@ -78,13 +79,20 @@ public class JsonReadStrategy extends AbstractReadStrategy {
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
- resolveArchiveCompressedInputStream(path, tableId, output,
partitionsMap, FileFormat.JSON);
+ resolveArchiveCompressedInputStream(
+ new FileSourceSplit(tableId, path), output, partitionsMap,
FileFormat.JSON);
+ }
+
+ @Override
+ public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+ throws IOException, FileConnectorException {
+ Map<String, String> partitionsMap =
parsePartitionsByPath(split.getFilePath());
+ resolveArchiveCompressedInputStream(split, output, partitionsMap,
FileFormat.JSON);
}
@Override
public void readProcess(
- String path,
- String tableId,
+ FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
@@ -106,6 +114,10 @@ public class JsonReadStrategy extends AbstractReadStrategy
{
actualInputStream = inputStream;
break;
}
+ // rebuild inputStream
+ if (enableSplitFile && split.getLength() > -1) {
+ actualInputStream = safeSlice(inputStream, split.getStart(),
split.getLength());
+ }
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(actualInputStream,
encoding))) {
reader.lines()
@@ -121,7 +133,7 @@ public class JsonReadStrategy extends AbstractReadStrategy {
seaTunnelRow.setField(index++,
value);
}
}
- seaTunnelRow.setTableId(tableId);
+
seaTunnelRow.setTableId(split.getTableId());
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
index 661a466e49..eef2aa2b67 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
@@ -78,7 +78,7 @@ public class MultipleTableFileSourceReader implements
SourceReader<SeaTunnelRow,
+ "]");
}
try {
- readStrategy.read(split.getFilePath(), split.getTableId(),
output);
+ readStrategy.read(split, output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s]
failed", split.splitId());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 9389223814..ed3a6152dd 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import java.io.Closeable;
import java.io.IOException;
@@ -45,6 +46,11 @@ public interface ReadStrategy extends Serializable,
Closeable {
void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException;
+ default void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+ throws IOException, FileConnectorException {
+ read(split.getFilePath(), split.getTableId(), output);
+ }
+
SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws
FileConnectorException;
default SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath,
String path)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 0c15174468..ecaca7a82f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptio
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
@@ -168,13 +169,20 @@ public class TextReadStrategy extends
AbstractReadStrategy {
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
- resolveArchiveCompressedInputStream(path, tableId, output,
partitionsMap, FileFormat.TEXT);
+ resolveArchiveCompressedInputStream(
+ new FileSourceSplit(tableId, path), output, partitionsMap,
FileFormat.TEXT);
+ }
+
+ @Override
+ public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+ throws IOException, FileConnectorException {
+ Map<String, String> partitionsMap =
parsePartitionsByPath(split.getFilePath());
+ resolveArchiveCompressedInputStream(split, output, partitionsMap,
FileFormat.TEXT);
}
@Override
public void readProcess(
- String path,
- String tableId,
+ FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
@@ -196,21 +204,27 @@ public class TextReadStrategy extends
AbstractReadStrategy {
actualInputStream = inputStream;
break;
}
-
+ // rebuild inputStream
+ if (enableSplitFile && split.getLength() > -1) {
+ actualInputStream = safeSlice(inputStream, split.getStart(),
split.getLength());
+ }
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(actualInputStream,
encoding))) {
LineProcessor lineProcessor =
line -> {
try {
- processLineData(line, tableId, output,
partitionsMap);
+ processLineData(line, split.getTableId(), output,
partitionsMap);
} catch (FileConnectorException e) {
throw new IOException(e);
}
};
-
- StreamLineSplitter splitter =
- new StreamLineSplitter(rowDelimiter, skipHeaderNumber,
lineProcessor);
+ StreamLineSplitter splitter;
+ if (enableSplitFile) {
+ splitter = new StreamLineSplitter(rowDelimiter, 0,
lineProcessor);
+ } else {
+ splitter = new StreamLineSplitter(rowDelimiter,
skipHeaderNumber, lineProcessor);
+ }
splitter.processStream(reader);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
index b1b621f451..1a9b2ba297 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
@@ -39,6 +39,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.commons.collections4.CollectionUtils;
@@ -91,13 +92,13 @@ public class XmlReadStrategy extends AbstractReadStrategy {
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws IOException, FileConnectorException {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
- resolveArchiveCompressedInputStream(path, tableId, output,
partitionsMap, FileFormat.XML);
+ resolveArchiveCompressedInputStream(
+ new FileSourceSplit(tableId, path), output, partitionsMap,
FileFormat.XML);
}
@Override
public void readProcess(
- String path,
- String tableId,
+ FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
@@ -109,7 +110,9 @@ public class XmlReadStrategy extends AbstractReadStrategy {
document = saxReader.read(new InputStreamReader(inputStream,
encoding));
} catch (DocumentException e) {
throw new FileConnectorException(
- FileConnectorErrorCode.FILE_READ_FAILED, "Failed to read
xml file: " + path, e);
+ FileConnectorErrorCode.FILE_READ_FAILED,
+ "Failed to read xml file: " + split.getFilePath(),
+ e);
}
Element rootElement = document.getRootElement();
@@ -161,7 +164,7 @@ public class XmlReadStrategy extends AbstractReadStrategy {
}
}
- seaTunnelRow.setTableId(tableId);
+ seaTunnelRow.setTableId(split.getTableId());
output.collect(seaTunnelRow);
});
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
new file mode 100644
index 0000000000..11117fef88
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AccordingToSplitSizeSplitStrategy implements
FileSplitStrategy {
+
+ private final long skipHeaderRowNumber;
+ private final long splitSize;
+ private final byte[] delimiterBytes;
+ private static final int BUFFER_SIZE = 64 * 1024;
+
+ public AccordingToSplitSizeSplitStrategy(
+ String rowDelimiter, long skipHeaderRowNumber, String
encodingName, long splitSize) {
+ this.skipHeaderRowNumber = skipHeaderRowNumber;
+ this.splitSize = splitSize;
+ this.delimiterBytes =
rowDelimiter.getBytes(Charset.forName(encodingName));
+ }
+
+ @Override
+ public List<FileSourceSplit> split(String tableId, String filePath) {
+ List<FileSourceSplit> splits = new ArrayList<>();
+ long fileSize = safeGetFileSize(filePath);
+ if (fileSize == 0) {
+ return splits;
+ }
+ long currentStart = 0;
+ if (skipHeaderRowNumber > 0) {
+ currentStart = skipHeaderWithBuffer(filePath, skipHeaderRowNumber);
+ }
+ while (currentStart < fileSize) {
+ long tentativeEnd = currentStart + splitSize;
+ if (tentativeEnd >= fileSize) {
+ splits.add(
+ new FileSourceSplit(
+ tableId, filePath, currentStart, fileSize -
currentStart));
+ break;
+ }
+ long actualEnd = findNextDelimiterWithBuffer(filePath,
tentativeEnd);
+ if (actualEnd <= currentStart) {
+ actualEnd = tentativeEnd;
+ }
+ splits.add(
+ new FileSourceSplit(tableId, filePath, currentStart,
actualEnd - currentStart));
+ currentStart = actualEnd;
+ }
+ return splits;
+ }
+
+ protected abstract InputStream getInputStream(String filePath) throws
IOException;
+
+ protected abstract long getFileSize(String filePath) throws IOException;
+
+ private long safeGetFileSize(String filePath) {
+ try {
+ return getFileSize(filePath);
+ } catch (IOException e) {
+ throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
+ }
+ }
+
+ private long skipHeaderWithBuffer(String filePath, long skipLines) {
+ try (InputStream input = getInputStream(filePath)) {
+ return skipLinesUsingBuffer(input, skipLines);
+ } catch (IOException e) {
+ throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
+ }
+ }
+
+ private long skipLinesUsingBuffer(InputStream is, long skipLines) throws
IOException {
+ byte[] buffer = new byte[BUFFER_SIZE];
+ long matched = 0;
+ long lines = 0;
+ long pos = 0;
+ int n;
+ while ((n = is.read(buffer)) != -1) {
+ for (int i = 0; i < n; i++) {
+ pos++;
+ if (buffer[i] == delimiterBytes[(int) matched]) {
+ matched++;
+ if (matched == delimiterBytes.length) {
+ matched = 0;
+ lines++;
+ if (lines >= skipLines) {
+ return pos;
+ }
+ }
+ } else {
+ matched = 0;
+ }
+ }
+ }
+
+ return pos;
+ }
+
+ private long findNextDelimiterWithBuffer(String filePath, long startPos) {
+ try (InputStream is = getInputStream(filePath)) {
+ long skipped = skipManually(is, startPos);
+ if (skipped < startPos) {
+ return startPos;
+ }
+ byte[] buffer = new byte[BUFFER_SIZE];
+ long matched = 0;
+ long pos = startPos;
+ int n;
+ while ((n = is.read(buffer)) != -1) {
+ for (int i = 0; i < n; i++) {
+ pos++;
+ if (buffer[i] == delimiterBytes[(int) matched]) {
+ matched++;
+ if (matched == delimiterBytes.length) {
+ return pos;
+ }
+ } else {
+ matched = 0;
+ }
+ }
+ }
+ return pos;
+
+ } catch (IOException e) {
+ throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
+ }
+ }
+
+ private long skipManually(InputStream is, long bytesToSkip) throws
IOException {
+ byte[] buffer = new byte[BUFFER_SIZE];
+ long total = 0;
+ while (total < bytesToSkip) {
+ long toRead = Math.min(buffer.length, bytesToSkip - total);
+ int n = is.read(buffer, 0, (int) toRead);
+ if (n == -1) break;
+ total += n;
+ }
+ return total;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/DefaultFileSplitStrategy.java
similarity index 52%
copy from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/DefaultFileSplitStrategy.java
index 9d19cf212e..964b4a98f6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/DefaultFileSplitStrategy.java
@@ -14,22 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
-package org.apache.seatunnel.connectors.seatunnel.file.local.source;
+import java.util.Collections;
+import java.util.List;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
-
-public class LocalFileSource extends BaseMultipleTableFileSource {
-
- public LocalFileSource(ReadonlyConfig readonlyConfig) {
- super(new MultipleTableLocalFileSourceConfig(readonlyConfig));
- }
-
- @Override
- public String getPluginName() {
- return FileSystemType.LOCAL.getFileSystemPluginName();
+public class DefaultFileSplitStrategy implements FileSplitStrategy {
+ public List<FileSourceSplit> split(String tableId, String filePath) {
+ return Collections.singletonList(new FileSourceSplit(tableId,
filePath));
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
index 4c938ad545..fea28898b0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
@@ -28,6 +28,8 @@ public class FileSourceSplit implements SourceSplit {
@Getter private final String tableId;
@Getter private final String filePath;
+ @Getter private long start = 0;
+ @Getter private long length = -1;
public FileSourceSplit(String splitId) {
this.filePath = splitId;
@@ -39,6 +41,13 @@ public class FileSourceSplit implements SourceSplit {
this.filePath = filePath;
}
+ public FileSourceSplit(String tableId, String filePath, long start, long
length) {
+ this.tableId = tableId;
+ this.filePath = filePath;
+ this.start = start;
+ this.length = length;
+ }
+
@Override
public String splitId() {
// In order to be compatible with the split before the upgrade, when
tableId is null,
@@ -46,7 +55,7 @@ public class FileSourceSplit implements SourceSplit {
if (tableId == null) {
return filePath;
}
- return tableId + "_" + filePath;
+ return tableId + "_" + filePath + "_" + start;
}
@Override
@@ -58,11 +67,14 @@ public class FileSourceSplit implements SourceSplit {
return false;
}
FileSourceSplit that = (FileSourceSplit) o;
- return Objects.equals(tableId, that.tableId) &&
Objects.equals(filePath, that.filePath);
+ return Objects.equals(tableId, that.tableId)
+ && Objects.equals(filePath, that.filePath)
+ && Objects.equals(start, that.start)
+ && Objects.equals(length, that.length);
}
@Override
public int hashCode() {
- return Objects.hash(tableId, filePath);
+ return Objects.hash(tableId, filePath, start, length);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
similarity index 52%
copy from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
index 9d19cf212e..12f7a4746f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
@@ -14,22 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
-package org.apache.seatunnel.connectors.seatunnel.file.local.source;
+import java.io.Serializable;
+import java.util.List;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
+public interface FileSplitStrategy extends Serializable {
-public class LocalFileSource extends BaseMultipleTableFileSource {
-
- public LocalFileSource(ReadonlyConfig readonlyConfig) {
- super(new MultipleTableLocalFileSourceConfig(readonlyConfig));
- }
-
- @Override
- public String getPluginName() {
- return FileSystemType.LOCAL.getFileSystemPluginName();
- }
+ List<FileSourceSplit> split(String tableId, String filePath);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
index bfa6024479..31b46140d5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
@@ -47,10 +47,12 @@ public class MultipleTableFileSourceSplitEnumerator
private final Map<String, List<String>> filePathMap;
private final AtomicInteger assignCount = new AtomicInteger(0);
private final Object lock = new Object();
+ private final FileSplitStrategy fileSplitStrategy;
public MultipleTableFileSourceSplitEnumerator(
Context<FileSourceSplit> context,
- BaseMultipleTableFileSourceConfig multipleTableFileSourceConfig) {
+ BaseMultipleTableFileSourceConfig multipleTableFileSourceConfig,
+ FileSplitStrategy fileSplitStrategy) {
this.context = context;
this.filePathMap =
multipleTableFileSourceConfig.getFileSourceConfigs().stream()
@@ -65,13 +67,14 @@ public class MultipleTableFileSourceSplitEnumerator
BaseFileSourceConfig::getFilePaths));
this.assignedSplit = new HashSet<>();
this.allSplit = new
TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
+ this.fileSplitStrategy = fileSplitStrategy;
}
public MultipleTableFileSourceSplitEnumerator(
Context<FileSourceSplit> context,
BaseMultipleTableFileSourceConfig multipleTableFileSourceConfig,
FileSourceState fileSourceState) {
- this(context, multipleTableFileSourceConfig);
+ this(context, multipleTableFileSourceConfig, new
DefaultFileSplitStrategy());
this.assignedSplit.addAll(fileSourceState.getAssignedSplit());
}
@@ -81,7 +84,7 @@ public class MultipleTableFileSourceSplitEnumerator
String tableId = filePathEntry.getKey();
List<String> filePaths = filePathEntry.getValue();
for (String filePath : filePaths) {
- allSplit.add(new FileSourceSplit(tableId, filePath));
+ allSplit.addAll(fileSplitStrategy.split(tableId, filePath));
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
index debdbc5fcf..8e4a35f704 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
@@ -78,7 +78,7 @@ public class MultipleTableFileSourceSplitEnumeratorTest {
Mockito.when(context.currentParallelism()).thenReturn(parallelism);
MultipleTableFileSourceSplitEnumerator enumerator =
new MultipleTableFileSourceSplitEnumerator(
- context, baseMultipleTableFileSourceConfig);
+ context, baseMultipleTableFileSourceConfig, new
DefaultFileSplitStrategy());
enumerator.open();
Assertions.assertEquals(50, enumerator.currentUnassignedSplitSize());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 9d19cf212e..f5e3ff74b5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -18,18 +18,44 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
public class LocalFileSource extends BaseMultipleTableFileSource {
public LocalFileSource(ReadonlyConfig readonlyConfig) {
- super(new MultipleTableLocalFileSourceConfig(readonlyConfig));
+ super(
+ new MultipleTableLocalFileSourceConfig(readonlyConfig),
+ initFileSplitStrategy(readonlyConfig));
}
@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
}
+
+ private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig
readonlyConfig) {
+ if (readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
+ return new DefaultFileSplitStrategy();
+ }
+ String rowDelimiter =
+
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
+ ? DEFAULT_ROW_DELIMITER
+ :
readonlyConfig.get(FileBaseSourceOptions.ROW_DELIMITER);
+ long skipHeaderRowNumber =
+ readonlyConfig.get(FileBaseSourceOptions.CSV_USE_HEADER_LINE)
+ ? 1L
+ :
readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
+ String encodingName =
readonlyConfig.get(FileBaseSourceOptions.ENCODING);
+ long splitSize =
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
+ return new LocalFileAccordingToSplitSizeSplitStrategy(
+ rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 2f1c6a0347..e6e0eabe07 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -84,6 +84,14 @@ public class LocalFileSourceFactory implements
TableSourceFactory {
Arrays.asList(
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
+ .conditional(
+ FileBaseSourceOptions.FILE_FORMAT_TYPE,
+ Arrays.asList(FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV),
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT)
+ .conditional(
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT,
+ Boolean.TRUE,
+ FileBaseSourceOptions.FILE_SPLIT_SIZE)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
.optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
new file mode 100644
index 0000000000..3a6cb18b13
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
+
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class LocalFileAccordingToSplitSizeSplitStrategy extends
AccordingToSplitSizeSplitStrategy {
+
+ public LocalFileAccordingToSplitSizeSplitStrategy(
+ String rowDelimiter, long skipHeaderRowNumber, String
encodingName, long splitSize) {
+ super(rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
+ }
+
+ @Override
+ protected InputStream getInputStream(String filePath) throws IOException {
+ Path path = toLocalNioPath(filePath);
+ return new BufferedInputStream(Files.newInputStream(path));
+ }
+
+ @Override
+ protected long getFileSize(String filePath) throws IOException {
+ Path path = toLocalNioPath(filePath);
+ return Files.size(path);
+ }
+
+ private static Path toLocalNioPath(String filePath) {
+ try {
+ URI uri = URI.create(filePath);
+ if ("file".equalsIgnoreCase(uri.getScheme())) {
+ return Paths.get(uri);
+ }
+ } catch (Exception ignored) {
+ // ignore malformed URI
+ }
+ return Paths.get(filePath);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
new file mode 100644
index 0000000000..c47aa7214d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.local;
+
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import lombok.SneakyThrows;
+
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class SplitFileStrategyTest {
+
+ @DisabledOnOs(
+ value = OS.WINDOWS,
+ disabledReason =
+ "In the Windows environment, the newline character of the
text file is '\\r\\n', and the byte length and newline character are
inconsistent, which will cause the test case to fail.")
+ @SneakyThrows
+ @Test
+ public void testSplitNoSkipHeader() {
+ final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
+ new LocalFileAccordingToSplitSizeSplitStrategy("\n", 0L,
"utf-8", 100L);
+ URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
+ String realPath = Paths.get(url.toURI()).toString();
+ final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(2, splits.size());
+ // check split-1
+ Assertions.assertEquals(0, splits.get(0).getStart());
+ Assertions.assertEquals(105, splits.get(0).getLength());
+ // check split-2
+ Assertions.assertEquals(105, splits.get(1).getStart());
+ Assertions.assertEquals(85, splits.get(1).getLength());
+ }
+
+ @DisabledOnOs(
+ value = OS.WINDOWS,
+ disabledReason =
+ "In the Windows environment, the newline character of the
text file is '\\r\\n', and the byte length and newline character are
inconsistent, which will cause the test case to fail.")
+ @SneakyThrows
+ @Test
+ public void testSplitSkipHeader() {
+ final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
+ new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 30L);
+ URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
+ String realPath = Paths.get(url.toURI()).toString();
+ final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(4, splits.size());
+ // check split-1
+ Assertions.assertEquals(21, splits.get(0).getStart());
+ Assertions.assertEquals(41, splits.get(0).getLength());
+ // check split-2
+ Assertions.assertEquals(62, splits.get(1).getStart());
+ Assertions.assertEquals(43, splits.get(1).getLength());
+ // check split-3
+ Assertions.assertEquals(105, splits.get(2).getStart());
+ Assertions.assertEquals(43, splits.get(2).getLength());
+ // check split-4
+ Assertions.assertEquals(148, splits.get(3).getStart());
+ Assertions.assertEquals(42, splits.get(3).getLength());
+ }
+
+ @DisabledOnOs(
+ value = OS.WINDOWS,
+ disabledReason =
+ "In the Windows environment, the newline character of the
text file is '\\r\\n', and the byte length and newline character are
inconsistent, which will cause the test case to fail.")
+ @SneakyThrows
+ @Test
+ public void testSplitSkipHeaderLargeSize() {
+ final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
+ new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 300L);
+ URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
+ String realPath = Paths.get(url.toURI()).toString();
+ final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(1, splits.size());
+ // check split-1
+ Assertions.assertEquals(21, splits.get(0).getStart());
+ Assertions.assertEquals(169, splits.get(0).getLength());
+ }
+
+ @DisabledOnOs(
+ value = OS.WINDOWS,
+ disabledReason =
+ "In the Windows environment, the newline character of the
text file is '\\r\\n', and the byte length and newline character are
inconsistent, which will cause the test case to fail.")
+ @SneakyThrows
+ @Test
+ public void testSplitSkipHeaderSmallSize() {
+ final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
+ new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 3L);
+ URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
+ String realPath = Paths.get(url.toURI()).toString();
+ final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(8, splits.size());
+ // check split
+ Assertions.assertEquals(21, splits.get(0).getStart());
+ Assertions.assertEquals(42, splits.get(1).getStart());
+ Assertions.assertEquals(62, splits.get(2).getStart());
+ Assertions.assertEquals(82, splits.get(3).getStart());
+ Assertions.assertEquals(105, splits.get(4).getStart());
+ Assertions.assertEquals(126, splits.get(5).getStart());
+ Assertions.assertEquals(148, splits.get(6).getStart());
+ Assertions.assertEquals(169, splits.get(7).getStart());
+ }
+
+ @SneakyThrows
+ @Test
+ public void testSplitSkipHeaderSpecialRowDelimiter() {
+ final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
+ new LocalFileAccordingToSplitSizeSplitStrategy("|^|", 1L,
"utf-8", 80L);
+ URL url =
+ getClass()
+ .getClassLoader()
+
.getResource("test_split_special_row_delimiter_data.txt");
+ String realPath = Paths.get(url.toURI()).toString();
+ final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(2, splits.size());
+ // check split-1
+ Assertions.assertEquals(23, splits.get(0).getStart());
+ Assertions.assertEquals(92, splits.get(0).getLength());
+ // check split-2
+ Assertions.assertEquals(115, splits.get(1).getStart());
+ Assertions.assertEquals(91, splits.get(1).getLength());
+ }
+
+ @SneakyThrows
+ @Test
+ public void testSplitEmpty() {
+ final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
+ new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 300L);
+ URL url =
getClass().getClassLoader().getResource("test_split_empty_data.csv");
+ String realPath = Paths.get(url.toURI()).toString();
+ final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(0, splits.size());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_csv_data.csv
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_csv_data.csv
new file mode 100644
index 0000000000..5b181503da
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_csv_data.csv
@@ -0,0 +1,9 @@
+id,name,password,age
+1,Tom,12345678910,18
+2,Jack,987654321,17
+3,Rose,135792468,19
+4,ZhangSan,09090909,16
+5,LiSi,w12354654w,20
+6,WangEr,tt7654321,18
+7,John,yy31415926,19
+8,LaoWang,ww123456,20
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_empty_data.csv
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_empty_data.csv
new file mode 100644
index 0000000000..e69de29bb2
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_special_row_delimiter_data.txt
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_special_row_delimiter_data.txt
new file mode 100644
index 0000000000..038f41512e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/resources/test_split_special_row_delimiter_data.txt
@@ -0,0 +1 @@
+id,name,password,age|^|1,Tom,12345678910,18|^|2,Jack,987654321,17|^|3,Rose,135792468,19|^|4,ZhangSan,09090909,16|^|5,LiSi,w12354654w,20|^|6,WangEr,tt7654321,18|^|7,John,yy31415926,19|^|8,LaoWang,ww123456,20
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
index 9ea6f1e632..bb5ba33ffd 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
@@ -80,7 +80,7 @@ public class MultipleTableHiveSourceReader implements
SourceReader<SeaTunnelRow,
+ "]");
}
try {
- readStrategy.read(split.getFilePath(), split.getTableId(),
output);
+ readStrategy.read(split, output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s]
failed", split.splitId());
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index f7a5fa6a11..e29448a1b7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -323,6 +323,7 @@ public class LocalFileIT extends TestSuiteBase {
TestHelper helper = new TestHelper(container);
helper.execute("/csv/fake_to_local_csv.conf");
helper.execute("/csv/local_csv_to_assert.conf");
+ helper.execute("/csv/local_csv_enable_split_to_assert.conf");
helper.execute("/csv/csv_with_header_to_assert.conf");
helper.execute("/csv/breakline_csv_to_assert.conf");
helper.execute("/excel/fake_to_local_excel.conf");
@@ -350,6 +351,7 @@ public class LocalFileIT extends TestSuiteBase {
helper.execute("/json/fake_to_local_file_json.conf");
// test read local json file
helper.execute("/json/local_file_json_to_assert.conf");
+ helper.execute("/json/local_file_json_enable_split_to_assert.conf");
helper.execute("/json/local_file_json_lzo_to_console.conf");
// test read local json file with assigning encoding
helper.execute("/json/fake_to_local_file_json_with_encoding.conf");
@@ -396,6 +398,7 @@ public class LocalFileIT extends TestSuiteBase {
helper.execute("/text/local_file_multi_zip_text_to_assert.conf");
// test read single local text file with tar compression
helper.execute("/text/local_file_tar_text_to_assert.conf");
+ helper.execute("/text/local_file_text_enable_split_to_assert.conf");
// test read multi local text file with tar compression
helper.execute("/text/local_file_multi_tar_text_to_assert.conf");
// test read single local text file with tar.gz compression
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/local_csv_enable_split_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/local_csv_enable_split_to_assert.conf
new file mode 100644
index 0000000000..3b37a6e971
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/local_csv_enable_split_to_assert.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 3
+ job.mode = "BATCH"
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/tmp/csv/seatunnel"
+ plugin_output = "fake"
+ file_format_type = csv
+ field_delimiter = ","
+ row_delimiter = "\n"
+ skip_header_row_number = 1
+ enable_file_split = true
+ file_split_size = 3
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp,
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_enable_split_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_enable_split_to_assert.conf
new file mode 100644
index 0000000000..04a5aaed50
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_enable_split_to_assert.conf
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 3
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/json"
+ file_format_type = "json"
+ enable_file_split = true
+ file_split_size = 3
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map<string, string>"
+ C_ARRAY = "array<int>"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ plugin_output = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = hobby
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_enable_split_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_enable_split_to_assert.conf
new file mode 100644
index 0000000000..d9304a394f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_enable_split_to_assert.conf
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 3
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/text"
+ file_format_type = "text"
+ enable_file_split = true
+ file_split_size = 3
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ plugin_output = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = hobby
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file