This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 10b9c74989 [core] format table: support file split (#6556)
10b9c74989 is described below
commit 10b9c749897db97dd030842a7f3fe6292a8f1361
Author: jerry <[email protected]>
AuthorDate: Mon Nov 10 18:57:52 2025 +0800
[core] format table: support file split (#6556)
---
.../org/apache/paimon/io/DataFileRecordReader.java | 18 +++
.../paimon/table/format/FormatDataSplit.java | 30 +++--
.../paimon/table/format/FormatReadBuilder.java | 34 +++---
.../paimon/table/format/FormatTableScan.java | 48 +++++++-
.../org/apache/paimon/catalog/CatalogTestBase.java | 42 +++++++
.../paimon/table/format/FormatDataSplitTest.java | 24 ++--
.../paimon/table/format/FormatReadBuilderTest.java | 101 ++++++++++++++++
.../paimon/table/format/FormatTableScanTest.java | 134 +++++++++++++++++++++
.../paimon/spark/FormatTableStatistics.scala | 12 +-
9 files changed, 412 insertions(+), 31 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 5f8a217b98..3f9e377a6b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -39,6 +39,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
/** Reads {@link InternalRow} from data files. */
@@ -84,6 +85,23 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
this.selection = context.selection();
}
+ public DataFileRecordReader(
+ RowType tableRowType,
+ FileRecordReader<InternalRow> reader,
+ @Nullable PartitionInfo partitionInfo)
+ throws IOException {
+ this.tableRowType = tableRowType;
+ this.reader = reader;
+ this.indexMapping = null;
+ this.partitionInfo = partitionInfo;
+ this.castMapping = null;
+ this.rowTrackingEnabled = false;
+ this.firstRowId = null;
+ this.maxSequenceNumber = 0L;
+ this.systemFields = Collections.emptyMap();
+ this.selection = null;
+ }
+
@Nullable
@Override
public FileRecordIterator<InternalRow> readBatch() throws IOException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
index 86b26a1351..e228b043f8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -29,33 +29,48 @@ import java.util.Objects;
/** {@link FormatDataSplit} for format table. */
public class FormatDataSplit implements Split {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final Path filePath;
+ private final long fileSize;
private final long offset;
- private final long length;
+ @Nullable private final Long length;
@Nullable private final BinaryRow partition;
- public FormatDataSplit(Path filePath, long offset, long length, @Nullable
BinaryRow partition) {
+ public FormatDataSplit(
+ Path filePath,
+ long fileSize,
+ long offset,
+ @Nullable Long length,
+ @Nullable BinaryRow partition) {
this.filePath = filePath;
+ this.fileSize = fileSize;
this.offset = offset;
this.length = length;
this.partition = partition;
}
+ public FormatDataSplit(Path filePath, long fileSize, @Nullable BinaryRow
partition) {
+ this(filePath, fileSize, 0L, null, partition);
+ }
+
public Path filePath() {
return this.filePath;
}
public Path dataPath() {
- return filePath;
+ return this.filePath;
+ }
+
+ public long fileSize() {
+ return this.fileSize;
}
public long offset() {
return offset;
}
- public long length() {
+ public Long length() {
return length;
}
@@ -78,13 +93,14 @@ public class FormatDataSplit implements Split {
}
FormatDataSplit that = (FormatDataSplit) o;
return offset == that.offset
- && length == that.length
+ && fileSize == that.fileSize
+ && Objects.equals(length, that.length)
&& Objects.equals(filePath, that.filePath)
&& Objects.equals(partition, that.partition);
}
@Override
public int hashCode() {
- return Objects.hash(filePath, offset, length, partition);
+ return Objects.hash(filePath, fileSize, offset, length, partition);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 81ff015769..54bba0d6a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -30,6 +30,7 @@ import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.source.ReadBuilder;
@@ -37,13 +38,13 @@ import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -157,7 +158,7 @@ public class FormatReadBuilder implements ReadBuilder {
protected RecordReader<InternalRow> createReader(FormatDataSplit
dataSplit) throws IOException {
Path filePath = dataSplit.dataPath();
FormatReaderContext formatReaderContext =
- new FormatReaderContext(table.fileIO(), filePath,
dataSplit.length(), null);
+ new FormatReaderContext(table.fileIO(), filePath,
dataSplit.fileSize(), null);
// Skip pushing down partition filters to reader.
List<Predicate> readFilters =
excludePredicateWithFields(
@@ -172,18 +173,23 @@ public class FormatReadBuilder implements ReadBuilder {
Pair<int[], RowType> partitionMapping =
PartitionUtils.getPartitionMapping(
table.partitionKeys(), readType().getFields(),
table.partitionType());
-
- return new DataFileRecordReader(
- readType(),
- readerFactory,
- formatReaderContext,
- null,
- null,
- PartitionUtils.create(partitionMapping, dataSplit.partition()),
- false,
- null,
- 0,
- Collections.emptyMap());
+ try {
+ FileRecordReader<InternalRow> reader;
+ if (dataSplit.length() != null) {
+ reader =
+ readerFactory.createReader(
+ formatReaderContext, dataSplit.offset(),
dataSplit.length());
+ } else {
+ reader = readerFactory.createReader(formatReaderContext);
+ }
+ return new DataFileRecordReader(
+ readType(),
+ reader,
+ PartitionUtils.create(partitionMapping,
dataSplit.partition()));
+ } catch (Exception e) {
+ FileUtils.checkExists(formatReaderContext.fileIO(),
formatReaderContext.filePath());
+ throw e;
+ }
}
private static RowType getRowTypeWithoutPartition(RowType rowType,
List<String> partitionKeys) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index ac4c29d134..96c3805470 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -22,6 +22,8 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.format.json.JsonOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
@@ -240,14 +242,58 @@ public class FormatTableScan implements InnerTableScan {
FileStatus[] files = fileIO.listFiles(path, true);
for (FileStatus file : files) {
if (isDataFileName(file.getPath().getName())) {
+ List<FormatDataSplit> fileSplits =
+ tryToSplitLargeFile(
+ table.format(), file,
coreOptions.splitTargetSize(), partition);
+ splits.addAll(fileSplits);
+ }
+ }
+ return splits;
+ }
+
+ private List<FormatDataSplit> tryToSplitLargeFile(
+ FormatTable.Format format, FileStatus file, long maxSplitBytes,
BinaryRow partition) {
+ boolean isSplittableFile =
+ ((format == FormatTable.Format.CSV
+ && !table.options()
+
.containsKey(CsvOptions.LINE_DELIMITER.key()))
+ || (format == FormatTable.Format.JSON
+ && !table.options()
+
.containsKey(JsonOptions.LINE_DELIMITER.key())))
+ && isTextFileUncompressed(file.getPath().getName());
+ List<FormatDataSplit> splits = new ArrayList<>();
+ if (isSplittableFile && file.getLen() > maxSplitBytes) {
+ long remainingBytes = file.getLen();
+ long currentStart = 0;
+
+ while (remainingBytes > 0) {
+ long splitSize = Math.min(maxSplitBytes, remainingBytes);
+
FormatDataSplit split =
- new FormatDataSplit(file.getPath(), 0, file.getLen(),
partition);
+ new FormatDataSplit(
+ file.getPath(), file.getLen(), currentStart,
splitSize, partition);
splits.add(split);
+ currentStart += splitSize;
+ remainingBytes -= splitSize;
}
+ } else {
+ splits.add(new FormatDataSplit(file.getPath(), file.getLen(),
partition));
}
return splits;
}
+ private static boolean isTextFileUncompressed(String fileName) {
+ if (fileName == null || fileName.trim().isEmpty()) {
+ return false;
+ }
+ String[] parts = fileName.split("\\.");
+ if (parts.length < 2) {
+ return false;
+ }
+ String lastExt = parts[parts.length - 1].toLowerCase();
+ return "csv".equals(lastExt) || "json".equals(lastExt);
+ }
+
public static Map<String, String>
extractLeadingEqualityPartitionSpecWhenOnlyAnd(
List<String> partitionKeys, Predicate predicate) {
List<Predicate> predicates = PredicateBuilder.splitAnd(predicate);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 4ddd6ab05f..2934cc3dbe 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -824,6 +824,48 @@ public abstract class CatalogTestBase {
catalog.dropTable(pid, true);
}
+ @Test
+ public void testFormatTableSplitRead() throws Exception {
+ if (!supportsFormatTable()) {
+ return;
+ }
+ Pair[] format2Compressions = {
+ Pair.of("csv", HadoopCompressionType.NONE),
+ Pair.of("json", HadoopCompressionType.NONE),
+ Pair.of("csv", HadoopCompressionType.GZIP),
+ Pair.of("json", HadoopCompressionType.GZIP),
+ Pair.of("parquet", HadoopCompressionType.ZSTD)
+ };
+ for (Pair<String, HadoopCompressionType> format2Compression :
format2Compressions) {
+ String format = format2Compression.getKey();
+ String compression = format2Compression.getValue().value();
+ String dbName = format + "_split_db_" + compression;
+ catalog.createDatabase(dbName, true);
+
+ Identifier id = Identifier.create(dbName, format + "_split_table_"
+ compression);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("score", DataTypes.DOUBLE())
+ .options(getFormatTableOptions())
+ .option("file.format", format)
+ .option("source.split.target-size", "54 B")
+ .option("file.compression", compression.toString())
+ .build();
+ catalog.createTable(id, schema, true);
+ FormatTable table = (FormatTable) catalog.getTable(id);
+ int size = 50;
+ InternalRow[] datas = new InternalRow[size];
+ for (int i = 0; i < size; i++) {
+ datas[i] = GenericRow.of(i, BinaryString.fromString("User" +
i), 85.5 + (i % 15));
+ }
+ writeAndCheckCommitFormatTable(table, datas, null);
+ List<InternalRow> allRows = read(table, null, null, null, null);
+ assertThat(allRows).containsExactlyInAnyOrder(datas);
+ }
+ }
+
private void writeAndCheckCommitFormatTable(
FormatTable table, InternalRow[] datas, InternalRow
dataWithDiffPartition)
throws Exception {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
index d992796875..601941063d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
@@ -46,11 +46,7 @@ public class FormatDataSplitTest {
Predicate predicate = builder.equal(0, 5);
// Create FormatDataSplit
- FormatDataSplit split =
- new FormatDataSplit(
- filePath, 100L, // offset
- 1024L, // length
- null);
+ FormatDataSplit split = new FormatDataSplit(filePath, 1024L, null);
// Test Java serialization
byte[] serialized = InstantiationUtil.serializeObject(split);
@@ -58,8 +54,20 @@ public class FormatDataSplitTest {
InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
// Verify the deserialized object
- assertThat(deserialized.filePath()).isEqualTo(filePath);
- assertThat(deserialized.offset()).isEqualTo(100L);
- assertThat(deserialized.length()).isEqualTo(1024L);
+ assertThat(deserialized.filePath()).isEqualTo(split.filePath());
+ assertThat(deserialized.offset()).isEqualTo(split.offset());
+ assertThat(deserialized.fileSize()).isEqualTo(split.fileSize());
+ assertThat(deserialized.length()).isEqualTo(split.length());
+
+ split = new FormatDataSplit(filePath, 1024L, 100L, 512L, null);
+
+ serialized = InstantiationUtil.serializeObject(split);
+ deserialized = InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ // Verify the deserialized object
+ assertThat(deserialized.filePath()).isEqualTo(split.filePath());
+ assertThat(deserialized.offset()).isEqualTo(split.offset());
+ assertThat(deserialized.fileSize()).isEqualTo(split.fileSize());
+ assertThat(deserialized.length()).isEqualTo(split.length());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
index 45cff37f3e..7b441e691d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
@@ -19,10 +19,21 @@
package org.apache.paimon.table.format;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.csv.CsvFileFormat;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -32,8 +43,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -122,4 +135,92 @@ public class FormatReadBuilderTest {
assertThat(deserialized.newScan()).isNotNull();
assertThat(deserialized.newRead()).isNotNull();
}
+
+ @Test
+ public void testCreateReaderWithCsvSplit() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("name", DataTypes.STRING())
+ .field("score", DataTypes.DOUBLE())
+ .build();
+
+ Map<String, String> options = new HashMap<>();
+ options.put("file.format", "csv");
+
+ Path tablePath = new Path(tempPath.toUri());
+ LocalFileIO fileIO = LocalFileIO.create();
+ FormatTable table =
+ FormatTable.builder()
+ .fileIO(fileIO)
+ .identifier(Identifier.create("test_db", "csv_table"))
+ .rowType(rowType)
+ .partitionKeys(Arrays.asList())
+ .location(tablePath.toString())
+ .format(FormatTable.Format.CSV)
+ .options(options)
+ .build();
+
+ FormatReadBuilder readBuilder = new FormatReadBuilder(table);
+
+ // Create CSV test data
+ Path csvFile = new Path(tablePath, "test_data.csv");
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"),
95.5),
+ GenericRow.of(2, BinaryString.fromString("Bob"), 87.3),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
92.8));
+
+ // Write CSV file
+ CsvFileFormat csvFormat =
+ new CsvFileFormat(new FileFormatFactory.FormatContext(new
Options(), 1024, 1024));
+ FormatWriterFactory writerFactory =
csvFormat.createWriterFactory(rowType);
+ try (PositionOutputStream out = fileIO.newOutputStream(csvFile, false);
+ FormatWriter writer = writerFactory.create(out, "none")) {
+ for (InternalRow row : testData) {
+ writer.addElement(row);
+ }
+ }
+
+ long fileSize = fileIO.getFileSize(csvFile);
+
+ // Test 1: Read entire CSV file (offset = 0, length = fileSize)
+ FormatDataSplit fullSplit = new FormatDataSplit(csvFile, fileSize,
null);
+ RecordReader<InternalRow> fullReader =
readBuilder.createReader(fullSplit);
+ List<InternalRow> fullResult = readAllRows(fullReader, rowType);
+
+ assertThat(fullResult).hasSize(3);
+ assertThat(fullResult.get(0).getInt(0)).isEqualTo(1);
+
assertThat(fullResult.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(fullResult.get(0).getDouble(2)).isEqualTo(95.5);
+ assertThat(fullResult.get(1).getInt(0)).isEqualTo(2);
+ assertThat(fullResult.get(1).getString(1).toString()).isEqualTo("Bob");
+ assertThat(fullResult.get(1).getDouble(2)).isEqualTo(87.3);
+ assertThat(fullResult.get(2).getInt(0)).isEqualTo(3);
+
assertThat(fullResult.get(2).getString(1).toString()).isEqualTo("Charlie");
+ assertThat(fullResult.get(2).getDouble(2)).isEqualTo(92.8);
+
+ // Test 2: Read CSV with offset and length (partial read)
+ // Read from offset 0 with a limited length (first 2 lines
approximately)
+ long partialLength = fileSize / 2;
+ FormatDataSplit partialSplit =
+ new FormatDataSplit(csvFile, fileSize, 0, partialLength, null);
+ RecordReader<InternalRow> partialReader =
readBuilder.createReader(partialSplit);
+ List<InternalRow> partialResult = readAllRows(partialReader, rowType);
+
+ // Verify we get at least some rows (exact count depends on line
boundaries)
+ assertThat(partialResult).isNotEmpty();
+ assertThat(partialResult.get(0).getInt(0)).isEqualTo(1);
+
assertThat(partialResult.get(0).getString(1).toString()).isEqualTo("Alice");
+ }
+
+ private List<InternalRow> readAllRows(RecordReader<InternalRow> reader,
RowType rowType)
+ throws IOException {
+ InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+ List<InternalRow> result = new ArrayList<>();
+ try (RecordReader<InternalRow> r = reader) {
+ r.forEachRemaining(row -> result.add(serializer.copy(row)));
+ }
+ return result;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
index ac49164ea8..c2d45f1c62 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
@@ -18,14 +18,19 @@
package org.apache.paimon.table.format;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.format.json.JsonOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.Split;
import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.DataTypes;
@@ -37,12 +42,15 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static
org.apache.paimon.utils.PartitionPathUtils.searchPartSpecAndPaths;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -668,4 +676,130 @@ public class FormatTableScanTest {
assertThat(result).isEmpty();
}
+
+ @TestTemplate
+ public void testCreateSplitsWithMultipleFiles() throws IOException {
+ for (String format : Arrays.asList("csv", "json")) {
+ Path tableLocation = new Path(new Path(tmpPath.toUri()), format);
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path file1 = new Path(tableLocation, "data1." + format);
+ Path file2 = new Path(tableLocation, "data2." + format);
+ Path file3 = new Path(tableLocation, "data3." + format);
+ writeTestFile(fileIO, file1, 80);
+ writeTestFile(fileIO, file2, 120);
+ writeTestFile(fileIO, file3, 200);
+
+ Map<String, String> options = new HashMap<>();
+ options.put(SOURCE_SPLIT_TARGET_SIZE.key(), "100b");
+
+ FormatTable formatTable =
+ createFormatTableWithOptions(
+ tableLocation,
+ FormatTable.Format.valueOf(format.toUpperCase()),
+ options);
+ FormatTableScan scan = new FormatTableScan(formatTable, null,
null);
+ List<Split> splits = scan.plan().splits();
+ assertThat(splits).hasSize(5);
+ }
+ }
+
+ @TestTemplate
+ public void testCreateSplitsWhenDefineLineDelimiter() throws IOException {
+ for (String format : Arrays.asList("csv", "json")) {
+ Path tableLocation = new Path(new Path(tmpPath.toUri()), format);
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path file1 = new Path(tableLocation, "data1." + format);
+ Path file2 = new Path(tableLocation, "data2." + format);
+ Path file3 = new Path(tableLocation, "data3." + format);
+ writeTestFile(fileIO, file1, 80);
+ writeTestFile(fileIO, file2, 120);
+ writeTestFile(fileIO, file3, 200);
+
+ Map<String, String> options = new HashMap<>();
+ options.put(SOURCE_SPLIT_TARGET_SIZE.key(), "100b");
+ if ("csv".equals(format)) {
+ options.put(CsvOptions.LINE_DELIMITER.key(), "\001");
+ } else {
+ options.put(JsonOptions.LINE_DELIMITER.key(), "\001");
+ }
+
+ FormatTable formatTable =
+ createFormatTableWithOptions(
+ tableLocation,
+ FormatTable.Format.valueOf(format.toUpperCase()),
+ options);
+ FormatTableScan scan = new FormatTableScan(formatTable, null,
null);
+ List<Split> splits = scan.plan().splits();
+ assertThat(splits).hasSize(3);
+ }
+ }
+
+ @TestTemplate
+ public void testCreateSplitsWithParquetFile() throws IOException {
+ Path tableLocation = new Path(tmpPath.toUri());
+ LocalFileIO fileIO = LocalFileIO.create();
+
+ // Create a large Parquet file (non-splittable)
+ Path parquetFile = new Path(tableLocation, "data.parquet");
+ long fileSize = 300; // 300 bytes
+ writeTestFile(fileIO, parquetFile, fileSize);
+
+ // Set split max size to 100 bytes
+ Map<String, String> options = new HashMap<>();
+ options.put(SOURCE_SPLIT_TARGET_SIZE.key(), "100b");
+
+ FormatTable formatTable =
+ createFormatTableWithOptions(tableLocation,
FormatTable.Format.PARQUET, options);
+ FormatTableScan scan = new FormatTableScan(formatTable, null, null);
+ List<Split> splits = scan.plan().splits();
+
+ // Parquet files should NOT be split, should be a single split
+ assertThat(splits).hasSize(1);
+ FormatDataSplit split = (FormatDataSplit) splits.get(0);
+ assertThat(split.filePath()).isEqualTo(parquetFile);
+ assertThat(split.offset()).isEqualTo(0);
+ }
+
+ @TestTemplate
+ public void testCreateSplitsWithEmptyDirectory() throws IOException {
+ Path tableLocation = new Path(tmpPath.toUri());
+ LocalFileIO fileIO = LocalFileIO.create();
+ fileIO.mkdirs(tableLocation);
+
+ FormatTable formatTable =
+ createFormatTableWithOptions(
+ tableLocation, FormatTable.Format.CSV,
Collections.emptyMap());
+ FormatTableScan scan = new FormatTableScan(formatTable, null, null);
+ List<Split> splits = scan.plan().splits();
+
+ assertThat(splits).isEmpty();
+ }
+
+ private void writeTestFile(LocalFileIO fileIO, Path filePath, long size)
throws IOException {
+ fileIO.mkdirs(filePath.getParent());
+ try (OutputStream out = fileIO.newOutputStream(filePath, false)) {
+ byte[] data = new byte[(int) size];
+ Arrays.fill(data, (byte) 'a');
+ out.write(data);
+ }
+ }
+
+ private FormatTable createFormatTableWithOptions(
+ Path tableLocation, FormatTable.Format format, Map<String, String>
options) {
+ RowType rowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("name", DataTypes.STRING())
+ .build();
+
+ return FormatTable.builder()
+ .fileIO(LocalFileIO.create())
+ .identifier(Identifier.create("test_db", "test_table"))
+ .rowType(rowType)
+ .partitionKeys(Collections.emptyList())
+ .location(tableLocation.toString())
+ .format(format)
+ .options(options)
+ .build();
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
index cfe3185bac..8863a259ef 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
@@ -30,7 +30,17 @@ import scala.collection.JavaConverters._
case class FormatTableStatistics[T <: PaimonFormatTableBaseScan](scan: T)
extends Statistics {
private lazy val fileTotalSize: Long =
-
scan.getOriginSplits.map(_.asInstanceOf[FormatDataSplit]).map(_.length()).sum
+ scan.getOriginSplits
+ .map(_.asInstanceOf[FormatDataSplit])
+ .map(
+ split => {
+ if (split.length() != null) {
+ split.length().longValue()
+ } else {
+ split.fileSize()
+ }
+ })
+ .sum
override def sizeInBytes(): OptionalLong = {
val size = fileTotalSize /