This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 10b9c74989 [core] format table: support file split (#6556)
10b9c74989 is described below

commit 10b9c749897db97dd030842a7f3fe6292a8f1361
Author: jerry <[email protected]>
AuthorDate: Mon Nov 10 18:57:52 2025 +0800

    [core] format table: support file split (#6556)
---
 .../org/apache/paimon/io/DataFileRecordReader.java |  18 +++
 .../paimon/table/format/FormatDataSplit.java       |  30 +++--
 .../paimon/table/format/FormatReadBuilder.java     |  34 +++---
 .../paimon/table/format/FormatTableScan.java       |  48 +++++++-
 .../org/apache/paimon/catalog/CatalogTestBase.java |  42 +++++++
 .../paimon/table/format/FormatDataSplitTest.java   |  24 ++--
 .../paimon/table/format/FormatReadBuilderTest.java | 101 ++++++++++++++++
 .../paimon/table/format/FormatTableScanTest.java   | 134 +++++++++++++++++++++
 .../paimon/spark/FormatTableStatistics.scala       |  12 +-
 9 files changed, 412 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 5f8a217b98..3f9e377a6b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -39,6 +39,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 
 /** Reads {@link InternalRow} from data files. */
@@ -84,6 +85,23 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
         this.selection = context.selection();
     }
 
+    public DataFileRecordReader(
+            RowType tableRowType,
+            FileRecordReader<InternalRow> reader,
+            @Nullable PartitionInfo partitionInfo)
+            throws IOException {
+        this.tableRowType = tableRowType;
+        this.reader = reader;
+        this.indexMapping = null;
+        this.partitionInfo = partitionInfo;
+        this.castMapping = null;
+        this.rowTrackingEnabled = false;
+        this.firstRowId = null;
+        this.maxSequenceNumber = 0L;
+        this.systemFields = Collections.emptyMap();
+        this.selection = null;
+    }
+
     @Nullable
     @Override
     public FileRecordIterator<InternalRow> readBatch() throws IOException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
index 86b26a1351..e228b043f8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -29,33 +29,48 @@ import java.util.Objects;
 /** {@link FormatDataSplit} for format table. */
 public class FormatDataSplit implements Split {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final Path filePath;
+    private final long fileSize;
     private final long offset;
-    private final long length;
+    @Nullable private final Long length;
     @Nullable private final BinaryRow partition;
 
-    public FormatDataSplit(Path filePath, long offset, long length, @Nullable 
BinaryRow partition) {
+    public FormatDataSplit(
+            Path filePath,
+            long fileSize,
+            long offset,
+            @Nullable Long length,
+            @Nullable BinaryRow partition) {
         this.filePath = filePath;
+        this.fileSize = fileSize;
         this.offset = offset;
         this.length = length;
         this.partition = partition;
     }
 
+    public FormatDataSplit(Path filePath, long fileSize, @Nullable BinaryRow 
partition) {
+        this(filePath, fileSize, 0L, null, partition);
+    }
+
     public Path filePath() {
         return this.filePath;
     }
 
     public Path dataPath() {
-        return filePath;
+        return this.filePath;
+    }
+
+    public long fileSize() {
+        return this.fileSize;
     }
 
     public long offset() {
         return offset;
     }
 
-    public long length() {
+    public Long length() {
         return length;
     }
 
@@ -78,13 +93,14 @@ public class FormatDataSplit implements Split {
         }
         FormatDataSplit that = (FormatDataSplit) o;
         return offset == that.offset
-                && length == that.length
+                && fileSize == that.fileSize
+                && Objects.equals(length, that.length)
                 && Objects.equals(filePath, that.filePath)
                 && Objects.equals(partition, that.partition);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(filePath, offset, length, partition);
+        return Objects.hash(filePath, fileSize, offset, length, partition);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 81ff015769..54bba0d6a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -30,6 +30,7 @@ import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -37,13 +38,13 @@ import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -157,7 +158,7 @@ public class FormatReadBuilder implements ReadBuilder {
     protected RecordReader<InternalRow> createReader(FormatDataSplit 
dataSplit) throws IOException {
         Path filePath = dataSplit.dataPath();
         FormatReaderContext formatReaderContext =
-                new FormatReaderContext(table.fileIO(), filePath, 
dataSplit.length(), null);
+                new FormatReaderContext(table.fileIO(), filePath, 
dataSplit.fileSize(), null);
         // Skip pushing down partition filters to reader.
         List<Predicate> readFilters =
                 excludePredicateWithFields(
@@ -172,18 +173,23 @@ public class FormatReadBuilder implements ReadBuilder {
         Pair<int[], RowType> partitionMapping =
                 PartitionUtils.getPartitionMapping(
                         table.partitionKeys(), readType().getFields(), 
table.partitionType());
-
-        return new DataFileRecordReader(
-                readType(),
-                readerFactory,
-                formatReaderContext,
-                null,
-                null,
-                PartitionUtils.create(partitionMapping, dataSplit.partition()),
-                false,
-                null,
-                0,
-                Collections.emptyMap());
+        try {
+            FileRecordReader<InternalRow> reader;
+            if (dataSplit.length() != null) {
+                reader =
+                        readerFactory.createReader(
+                                formatReaderContext, dataSplit.offset(), 
dataSplit.length());
+            } else {
+                reader = readerFactory.createReader(formatReaderContext);
+            }
+            return new DataFileRecordReader(
+                    readType(),
+                    reader,
+                    PartitionUtils.create(partitionMapping, 
dataSplit.partition()));
+        } catch (Exception e) {
+            FileUtils.checkExists(formatReaderContext.fileIO(), 
formatReaderContext.filePath());
+            throw e;
+        }
     }
 
     private static RowType getRowTypeWithoutPartition(RowType rowType, 
List<String> partitionKeys) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index ac4c29d134..96c3805470 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -22,6 +22,8 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.format.json.JsonOptions;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -240,14 +242,58 @@ public class FormatTableScan implements InnerTableScan {
         FileStatus[] files = fileIO.listFiles(path, true);
         for (FileStatus file : files) {
             if (isDataFileName(file.getPath().getName())) {
+                List<FormatDataSplit> fileSplits =
+                        tryToSplitLargeFile(
+                                table.format(), file, 
coreOptions.splitTargetSize(), partition);
+                splits.addAll(fileSplits);
+            }
+        }
+        return splits;
+    }
+
+    private List<FormatDataSplit> tryToSplitLargeFile(
+            FormatTable.Format format, FileStatus file, long maxSplitBytes, 
BinaryRow partition) {
+        boolean isSplittableFile =
+                ((format == FormatTable.Format.CSV
+                                        && !table.options()
+                                                
.containsKey(CsvOptions.LINE_DELIMITER.key()))
+                                || (format == FormatTable.Format.JSON
+                                        && !table.options()
+                                                
.containsKey(JsonOptions.LINE_DELIMITER.key())))
+                        && isTextFileUncompressed(file.getPath().getName());
+        List<FormatDataSplit> splits = new ArrayList<>();
+        if (isSplittableFile && file.getLen() > maxSplitBytes) {
+            long remainingBytes = file.getLen();
+            long currentStart = 0;
+
+            while (remainingBytes > 0) {
+                long splitSize = Math.min(maxSplitBytes, remainingBytes);
+
                 FormatDataSplit split =
-                        new FormatDataSplit(file.getPath(), 0, file.getLen(), 
partition);
+                        new FormatDataSplit(
+                                file.getPath(), file.getLen(), currentStart, 
splitSize, partition);
                 splits.add(split);
+                currentStart += splitSize;
+                remainingBytes -= splitSize;
             }
+        } else {
+            splits.add(new FormatDataSplit(file.getPath(), file.getLen(), 
partition));
         }
         return splits;
     }
 
+    private static boolean isTextFileUncompressed(String fileName) {
+        if (fileName == null || fileName.trim().isEmpty()) {
+            return false;
+        }
+        String[] parts = fileName.split("\\.");
+        if (parts.length < 2) {
+            return false;
+        }
+        String lastExt = parts[parts.length - 1].toLowerCase();
+        return "csv".equals(lastExt) || "json".equals(lastExt);
+    }
+
     public static Map<String, String> 
extractLeadingEqualityPartitionSpecWhenOnlyAnd(
             List<String> partitionKeys, Predicate predicate) {
         List<Predicate> predicates = PredicateBuilder.splitAnd(predicate);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 4ddd6ab05f..2934cc3dbe 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -824,6 +824,48 @@ public abstract class CatalogTestBase {
         catalog.dropTable(pid, true);
     }
 
+    @Test
+    public void testFormatTableSplitRead() throws Exception {
+        if (!supportsFormatTable()) {
+            return;
+        }
+        Pair[] format2Compressions = {
+            Pair.of("csv", HadoopCompressionType.NONE),
+            Pair.of("json", HadoopCompressionType.NONE),
+            Pair.of("csv", HadoopCompressionType.GZIP),
+            Pair.of("json", HadoopCompressionType.GZIP),
+            Pair.of("parquet", HadoopCompressionType.ZSTD)
+        };
+        for (Pair<String, HadoopCompressionType> format2Compression : 
format2Compressions) {
+            String format = format2Compression.getKey();
+            String compression = format2Compression.getValue().value();
+            String dbName = format + "_split_db_" + compression;
+            catalog.createDatabase(dbName, true);
+
+            Identifier id = Identifier.create(dbName, format + "_split_table_" 
+ compression);
+            Schema schema =
+                    Schema.newBuilder()
+                            .column("id", DataTypes.INT())
+                            .column("name", DataTypes.STRING())
+                            .column("score", DataTypes.DOUBLE())
+                            .options(getFormatTableOptions())
+                            .option("file.format", format)
+                            .option("source.split.target-size", "54 B")
+                            .option("file.compression", compression.toString())
+                            .build();
+            catalog.createTable(id, schema, true);
+            FormatTable table = (FormatTable) catalog.getTable(id);
+            int size = 50;
+            InternalRow[] datas = new InternalRow[size];
+            for (int i = 0; i < size; i++) {
+                datas[i] = GenericRow.of(i, BinaryString.fromString("User" + 
i), 85.5 + (i % 15));
+            }
+            writeAndCheckCommitFormatTable(table, datas, null);
+            List<InternalRow> allRows = read(table, null, null, null, null);
+            assertThat(allRows).containsExactlyInAnyOrder(datas);
+        }
+    }
+
     private void writeAndCheckCommitFormatTable(
             FormatTable table, InternalRow[] datas, InternalRow 
dataWithDiffPartition)
             throws Exception {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
index d992796875..601941063d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
@@ -46,11 +46,7 @@ public class FormatDataSplitTest {
         Predicate predicate = builder.equal(0, 5);
 
         // Create FormatDataSplit
-        FormatDataSplit split =
-                new FormatDataSplit(
-                        filePath, 100L, // offset
-                        1024L, // length
-                        null);
+        FormatDataSplit split = new FormatDataSplit(filePath, 1024L, null);
 
         // Test Java serialization
         byte[] serialized = InstantiationUtil.serializeObject(split);
@@ -58,8 +54,20 @@ public class FormatDataSplitTest {
                 InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
 
         // Verify the deserialized object
-        assertThat(deserialized.filePath()).isEqualTo(filePath);
-        assertThat(deserialized.offset()).isEqualTo(100L);
-        assertThat(deserialized.length()).isEqualTo(1024L);
+        assertThat(deserialized.filePath()).isEqualTo(split.filePath());
+        assertThat(deserialized.offset()).isEqualTo(split.offset());
+        assertThat(deserialized.fileSize()).isEqualTo(split.fileSize());
+        assertThat(deserialized.length()).isEqualTo(split.length());
+
+        split = new FormatDataSplit(filePath, 1024L, 100L, 512L, null);
+
+        serialized = InstantiationUtil.serializeObject(split);
+        deserialized = InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        // Verify the deserialized object
+        assertThat(deserialized.filePath()).isEqualTo(split.filePath());
+        assertThat(deserialized.offset()).isEqualTo(split.offset());
+        assertThat(deserialized.fileSize()).isEqualTo(split.fileSize());
+        assertThat(deserialized.length()).isEqualTo(split.length());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
index 45cff37f3e..7b441e691d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
@@ -19,10 +19,21 @@
 package org.apache.paimon.table.format;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.csv.CsvFileFormat;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -32,8 +43,10 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -122,4 +135,92 @@ public class FormatReadBuilderTest {
         assertThat(deserialized.newScan()).isNotNull();
         assertThat(deserialized.newRead()).isNotNull();
     }
+
+    @Test
+    public void testCreateReaderWithCsvSplit() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("name", DataTypes.STRING())
+                        .field("score", DataTypes.DOUBLE())
+                        .build();
+
+        Map<String, String> options = new HashMap<>();
+        options.put("file.format", "csv");
+
+        Path tablePath = new Path(tempPath.toUri());
+        LocalFileIO fileIO = LocalFileIO.create();
+        FormatTable table =
+                FormatTable.builder()
+                        .fileIO(fileIO)
+                        .identifier(Identifier.create("test_db", "csv_table"))
+                        .rowType(rowType)
+                        .partitionKeys(Arrays.asList())
+                        .location(tablePath.toString())
+                        .format(FormatTable.Format.CSV)
+                        .options(options)
+                        .build();
+
+        FormatReadBuilder readBuilder = new FormatReadBuilder(table);
+
+        // Create CSV test data
+        Path csvFile = new Path(tablePath, "test_data.csv");
+        List<InternalRow> testData =
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("Alice"), 
95.5),
+                        GenericRow.of(2, BinaryString.fromString("Bob"), 87.3),
+                        GenericRow.of(3, BinaryString.fromString("Charlie"), 
92.8));
+
+        // Write CSV file
+        CsvFileFormat csvFormat =
+                new CsvFileFormat(new FileFormatFactory.FormatContext(new 
Options(), 1024, 1024));
+        FormatWriterFactory writerFactory = 
csvFormat.createWriterFactory(rowType);
+        try (PositionOutputStream out = fileIO.newOutputStream(csvFile, false);
+                FormatWriter writer = writerFactory.create(out, "none")) {
+            for (InternalRow row : testData) {
+                writer.addElement(row);
+            }
+        }
+
+        long fileSize = fileIO.getFileSize(csvFile);
+
+        // Test 1: Read entire CSV file (offset = 0, length = fileSize)
+        FormatDataSplit fullSplit = new FormatDataSplit(csvFile, fileSize, 
null);
+        RecordReader<InternalRow> fullReader = 
readBuilder.createReader(fullSplit);
+        List<InternalRow> fullResult = readAllRows(fullReader, rowType);
+
+        assertThat(fullResult).hasSize(3);
+        assertThat(fullResult.get(0).getInt(0)).isEqualTo(1);
+        
assertThat(fullResult.get(0).getString(1).toString()).isEqualTo("Alice");
+        assertThat(fullResult.get(0).getDouble(2)).isEqualTo(95.5);
+        assertThat(fullResult.get(1).getInt(0)).isEqualTo(2);
+        assertThat(fullResult.get(1).getString(1).toString()).isEqualTo("Bob");
+        assertThat(fullResult.get(1).getDouble(2)).isEqualTo(87.3);
+        assertThat(fullResult.get(2).getInt(0)).isEqualTo(3);
+        
assertThat(fullResult.get(2).getString(1).toString()).isEqualTo("Charlie");
+        assertThat(fullResult.get(2).getDouble(2)).isEqualTo(92.8);
+
+        // Test 2: Read CSV with offset and length (partial read)
+        // Read from offset 0 with a limited length (first 2 lines 
approximately)
+        long partialLength = fileSize / 2;
+        FormatDataSplit partialSplit =
+                new FormatDataSplit(csvFile, fileSize, 0, partialLength, null);
+        RecordReader<InternalRow> partialReader = 
readBuilder.createReader(partialSplit);
+        List<InternalRow> partialResult = readAllRows(partialReader, rowType);
+
+        // Verify we get at least some rows (exact count depends on line 
boundaries)
+        assertThat(partialResult).isNotEmpty();
+        assertThat(partialResult.get(0).getInt(0)).isEqualTo(1);
+        
assertThat(partialResult.get(0).getString(1).toString()).isEqualTo("Alice");
+    }
+
+    private List<InternalRow> readAllRows(RecordReader<InternalRow> reader, 
RowType rowType)
+            throws IOException {
+        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+        List<InternalRow> result = new ArrayList<>();
+        try (RecordReader<InternalRow> r = reader) {
+            r.forEachRemaining(row -> result.add(serializer.copy(row)));
+        }
+        return result;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
index ac49164ea8..c2d45f1c62 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
@@ -18,14 +18,19 @@
 
 package org.apache.paimon.table.format;
 
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.format.json.JsonOptions;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.Split;
 import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
 import org.apache.paimon.testutils.junit.parameterized.Parameters;
 import org.apache.paimon.types.DataTypes;
@@ -37,12 +42,15 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static 
org.apache.paimon.utils.PartitionPathUtils.searchPartSpecAndPaths;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -668,4 +676,130 @@ public class FormatTableScanTest {
 
         assertThat(result).isEmpty();
     }
+
+    @TestTemplate
+    public void testCreateSplitsWithMultipleFiles() throws IOException {
+        for (String format : Arrays.asList("csv", "json")) {
+            Path tableLocation = new Path(new Path(tmpPath.toUri()), format);
+            LocalFileIO fileIO = LocalFileIO.create();
+            Path file1 = new Path(tableLocation, "data1." + format);
+            Path file2 = new Path(tableLocation, "data2." + format);
+            Path file3 = new Path(tableLocation, "data3." + format);
+            writeTestFile(fileIO, file1, 80);
+            writeTestFile(fileIO, file2, 120);
+            writeTestFile(fileIO, file3, 200);
+
+            Map<String, String> options = new HashMap<>();
+            options.put(SOURCE_SPLIT_TARGET_SIZE.key(), "100b");
+
+            FormatTable formatTable =
+                    createFormatTableWithOptions(
+                            tableLocation,
+                            FormatTable.Format.valueOf(format.toUpperCase()),
+                            options);
+            FormatTableScan scan = new FormatTableScan(formatTable, null, 
null);
+            List<Split> splits = scan.plan().splits();
+            assertThat(splits).hasSize(5);
+        }
+    }
+
+    @TestTemplate
+    public void testCreateSplitsWhenDefineLineDelimiter() throws IOException {
+        for (String format : Arrays.asList("csv", "json")) {
+            Path tableLocation = new Path(new Path(tmpPath.toUri()), format);
+            LocalFileIO fileIO = LocalFileIO.create();
+            Path file1 = new Path(tableLocation, "data1." + format);
+            Path file2 = new Path(tableLocation, "data2." + format);
+            Path file3 = new Path(tableLocation, "data3." + format);
+            writeTestFile(fileIO, file1, 80);
+            writeTestFile(fileIO, file2, 120);
+            writeTestFile(fileIO, file3, 200);
+
+            Map<String, String> options = new HashMap<>();
+            options.put(SOURCE_SPLIT_TARGET_SIZE.key(), "100b");
+            if ("csv".equals(format)) {
+                options.put(CsvOptions.LINE_DELIMITER.key(), "\001");
+            } else {
+                options.put(JsonOptions.LINE_DELIMITER.key(), "\001");
+            }
+
+            FormatTable formatTable =
+                    createFormatTableWithOptions(
+                            tableLocation,
+                            FormatTable.Format.valueOf(format.toUpperCase()),
+                            options);
+            FormatTableScan scan = new FormatTableScan(formatTable, null, 
null);
+            List<Split> splits = scan.plan().splits();
+            assertThat(splits).hasSize(3);
+        }
+    }
+
+    @TestTemplate
+    public void testCreateSplitsWithParquetFile() throws IOException {
+        Path tableLocation = new Path(tmpPath.toUri());
+        LocalFileIO fileIO = LocalFileIO.create();
+
+        // Create a large Parquet file (non-splittable)
+        Path parquetFile = new Path(tableLocation, "data.parquet");
+        long fileSize = 300; // 300 bytes
+        writeTestFile(fileIO, parquetFile, fileSize);
+
+        // Set split max size to 100 bytes
+        Map<String, String> options = new HashMap<>();
+        options.put(SOURCE_SPLIT_TARGET_SIZE.key(), "100b");
+
+        FormatTable formatTable =
+                createFormatTableWithOptions(tableLocation, 
FormatTable.Format.PARQUET, options);
+        FormatTableScan scan = new FormatTableScan(formatTable, null, null);
+        List<Split> splits = scan.plan().splits();
+
+        // Parquet files should NOT be split, should be a single split
+        assertThat(splits).hasSize(1);
+        FormatDataSplit split = (FormatDataSplit) splits.get(0);
+        assertThat(split.filePath()).isEqualTo(parquetFile);
+        assertThat(split.offset()).isEqualTo(0);
+    }
+
+    @TestTemplate
+    public void testCreateSplitsWithEmptyDirectory() throws IOException {
+        Path tableLocation = new Path(tmpPath.toUri());
+        LocalFileIO fileIO = LocalFileIO.create();
+        fileIO.mkdirs(tableLocation);
+
+        FormatTable formatTable =
+                createFormatTableWithOptions(
+                        tableLocation, FormatTable.Format.CSV, 
Collections.emptyMap());
+        FormatTableScan scan = new FormatTableScan(formatTable, null, null);
+        List<Split> splits = scan.plan().splits();
+
+        assertThat(splits).isEmpty();
+    }
+
+    private void writeTestFile(LocalFileIO fileIO, Path filePath, long size) 
throws IOException {
+        fileIO.mkdirs(filePath.getParent());
+        try (OutputStream out = fileIO.newOutputStream(filePath, false)) {
+            byte[] data = new byte[(int) size];
+            Arrays.fill(data, (byte) 'a');
+            out.write(data);
+        }
+    }
+
+    private FormatTable createFormatTableWithOptions(
+            Path tableLocation, FormatTable.Format format, Map<String, String> 
options) {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("name", DataTypes.STRING())
+                        .build();
+
+        return FormatTable.builder()
+                .fileIO(LocalFileIO.create())
+                .identifier(Identifier.create("test_db", "test_table"))
+                .rowType(rowType)
+                .partitionKeys(Collections.emptyList())
+                .location(tableLocation.toString())
+                .format(format)
+                .options(options)
+                .build();
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
index cfe3185bac..8863a259ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
@@ -30,7 +30,17 @@ import scala.collection.JavaConverters._
 case class FormatTableStatistics[T <: PaimonFormatTableBaseScan](scan: T) 
extends Statistics {
 
   private lazy val fileTotalSize: Long =
-    
scan.getOriginSplits.map(_.asInstanceOf[FormatDataSplit]).map(_.length()).sum
+    scan.getOriginSplits
+      .map(_.asInstanceOf[FormatDataSplit])
+      .map(
+        split => {
+          if (split.length() != null) {
+            split.length().longValue()
+          } else {
+            split.fileSize()
+          }
+        })
+      .sum
 
   override def sizeInBytes(): OptionalLong = {
     val size = fileTotalSize /

Reply via email to