This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit b4dc68922fa6354844ddaf5093482e38a39aeb97
Author: JingsongLi <[email protected]>
AuthorDate: Mon Nov 10 19:25:04 2025 +0800

    [format] Refactor split in FormatTableScan
---
 .../org/apache/paimon/io/DataFileRecordReader.java | 56 ++++++++++-------
 .../paimon/table/format/FormatDataSplit.java       |  1 +
 .../paimon/table/format/FormatReadBuilder.java     | 18 ++++--
 .../paimon/table/format/FormatTableScan.java       | 71 +++++++++++-----------
 .../paimon/format/text/HadoopCompressionUtils.java | 16 +++--
 .../apache/paimon/format/text/TextLineReader.java  | 15 +++--
 6 files changed, 106 insertions(+), 71 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 3f9e377a6b..2584aef00f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -39,7 +39,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /** Reads {@link InternalRow} from data files. */
@@ -68,13 +67,32 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
             long maxSequenceNumber,
             Map<String, Integer> systemFields)
             throws IOException {
+        this(
+                tableRowType,
+                createReader(readerFactory, context),
+                indexMapping,
+                castMapping,
+                partitionInfo,
+                rowTrackingEnabled,
+                firstRowId,
+                maxSequenceNumber,
+                systemFields,
+                context.selection());
+    }
+
+    public DataFileRecordReader(
+            RowType tableRowType,
+            FileRecordReader<InternalRow> reader,
+            @Nullable int[] indexMapping,
+            @Nullable CastFieldGetter[] castMapping,
+            @Nullable PartitionInfo partitionInfo,
+            boolean rowTrackingEnabled,
+            @Nullable Long firstRowId,
+            long maxSequenceNumber,
+            Map<String, Integer> systemFields,
+            @Nullable RoaringBitmap32 selection) {
         this.tableRowType = tableRowType;
-        try {
-            this.reader = readerFactory.createReader(context);
-        } catch (Exception e) {
-            FileUtils.checkExists(context.fileIO(), context.filePath());
-            throw e;
-        }
+        this.reader = reader;
         this.indexMapping = indexMapping;
         this.partitionInfo = partitionInfo;
         this.castMapping = castMapping;
@@ -82,24 +100,18 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
         this.firstRowId = firstRowId;
         this.maxSequenceNumber = maxSequenceNumber;
         this.systemFields = systemFields;
-        this.selection = context.selection();
+        this.selection = selection;
     }
 
-    public DataFileRecordReader(
-            RowType tableRowType,
-            FileRecordReader<InternalRow> reader,
-            @Nullable PartitionInfo partitionInfo)
+    private static FileRecordReader<InternalRow> createReader(
+            FormatReaderFactory readerFactory, FormatReaderFactory.Context 
context)
             throws IOException {
-        this.tableRowType = tableRowType;
-        this.reader = reader;
-        this.indexMapping = null;
-        this.partitionInfo = partitionInfo;
-        this.castMapping = null;
-        this.rowTrackingEnabled = false;
-        this.firstRowId = null;
-        this.maxSequenceNumber = 0L;
-        this.systemFields = Collections.emptyMap();
-        this.selection = null;
+        try {
+            return readerFactory.createReader(context);
+        } catch (Exception e) {
+            FileUtils.checkExists(context.fileIO(), context.filePath());
+            throw e;
+        }
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
index e228b043f8..e446337641 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -70,6 +70,7 @@ public class FormatDataSplit implements Split {
         return offset;
     }
 
+    @Nullable
     public Long length() {
         return length;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 54bba0d6a3..449ae74029 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -45,6 +45,7 @@ import org.apache.paimon.utils.Pair;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +56,7 @@ import static 
org.apache.paimon.partition.PartitionPredicate.fromPredicate;
 import static 
org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
 import static 
org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link ReadBuilder} for {@link FormatTable}. */
 public class FormatReadBuilder implements ReadBuilder {
@@ -175,17 +177,25 @@ public class FormatReadBuilder implements ReadBuilder {
                         table.partitionKeys(), readType().getFields(), 
table.partitionType());
         try {
             FileRecordReader<InternalRow> reader;
-            if (dataSplit.length() != null) {
+            Long length = dataSplit.length();
+            if (length != null) {
                 reader =
-                        readerFactory.createReader(
-                                formatReaderContext, dataSplit.offset(), 
dataSplit.length());
+                        readerFactory.createReader(formatReaderContext, 
dataSplit.offset(), length);
             } else {
+                checkArgument(dataSplit.offset() == 0, "Offset must be 0.");
                 reader = readerFactory.createReader(formatReaderContext);
             }
             return new DataFileRecordReader(
                     readType(),
                     reader,
-                    PartitionUtils.create(partitionMapping, 
dataSplit.partition()));
+                    null,
+                    null,
+                    PartitionUtils.create(partitionMapping, 
dataSplit.partition()),
+                    false,
+                    null,
+                    0,
+                    Collections.emptyMap(),
+                    null);
         } catch (Exception e) {
             FileUtils.checkExists(formatReaderContext.fileIO(), 
formatReaderContext.filePath());
             throw e;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index 96c3805470..20cbb9b31d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -28,6 +28,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import 
org.apache.paimon.partition.PartitionPredicate.DefaultPartitionPredicate;
 import 
org.apache.paimon.partition.PartitionPredicate.MultiplePartitionPredicate;
@@ -49,12 +50,15 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.paimon.format.text.HadoopCompressionUtils.isCompressed;
+import static org.apache.paimon.format.text.TextLineReader.isDefaultDelimiter;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
 import static 
org.apache.paimon.utils.PartitionPathUtils.searchPartSpecAndPaths;
 
@@ -65,6 +69,8 @@ public class FormatTableScan implements InnerTableScan {
     private final CoreOptions coreOptions;
     @Nullable private PartitionPredicate partitionFilter;
     @Nullable private final Integer limit;
+    private final long targetSplitSize;
+    private final FormatTable.Format format;
 
     public FormatTableScan(
             FormatTable table,
@@ -74,6 +80,8 @@ public class FormatTableScan implements InnerTableScan {
         this.coreOptions = new CoreOptions(table.options());
         this.partitionFilter = partitionFilter;
         this.limit = limit;
+        this.targetSplitSize = coreOptions.splitTargetSize();
+        this.format = table.format();
     }
 
     @Override
@@ -242,56 +250,51 @@ public class FormatTableScan implements InnerTableScan {
         FileStatus[] files = fileIO.listFiles(path, true);
         for (FileStatus file : files) {
             if (isDataFileName(file.getPath().getName())) {
-                List<FormatDataSplit> fileSplits =
-                        tryToSplitLargeFile(
-                                table.format(), file, 
coreOptions.splitTargetSize(), partition);
+                List<FormatDataSplit> fileSplits = tryToSplitLargeFile(file, 
partition);
                 splits.addAll(fileSplits);
             }
         }
         return splits;
     }
 
-    private List<FormatDataSplit> tryToSplitLargeFile(
-            FormatTable.Format format, FileStatus file, long maxSplitBytes, 
BinaryRow partition) {
-        boolean isSplittableFile =
-                ((format == FormatTable.Format.CSV
-                                        && !table.options()
-                                                
.containsKey(CsvOptions.LINE_DELIMITER.key()))
-                                || (format == FormatTable.Format.JSON
-                                        && !table.options()
-                                                
.containsKey(JsonOptions.LINE_DELIMITER.key())))
-                        && isTextFileUncompressed(file.getPath().getName());
+    private List<FormatDataSplit> tryToSplitLargeFile(FileStatus file, 
BinaryRow partition) {
+        if (!preferToSplitFile(file)) {
+            return Collections.singletonList(
+                    new FormatDataSplit(file.getPath(), file.getLen(), 
partition));
+        }
         List<FormatDataSplit> splits = new ArrayList<>();
-        if (isSplittableFile && file.getLen() > maxSplitBytes) {
-            long remainingBytes = file.getLen();
-            long currentStart = 0;
+        long remainingBytes = file.getLen();
+        long currentStart = 0;
 
-            while (remainingBytes > 0) {
-                long splitSize = Math.min(maxSplitBytes, remainingBytes);
+        while (remainingBytes > 0) {
+            long splitSize = Math.min(targetSplitSize, remainingBytes);
 
-                FormatDataSplit split =
-                        new FormatDataSplit(
-                                file.getPath(), file.getLen(), currentStart, 
splitSize, partition);
-                splits.add(split);
-                currentStart += splitSize;
-                remainingBytes -= splitSize;
-            }
-        } else {
-            splits.add(new FormatDataSplit(file.getPath(), file.getLen(), 
partition));
+            FormatDataSplit split =
+                    new FormatDataSplit(
+                            file.getPath(), file.getLen(), currentStart, 
splitSize, partition);
+            splits.add(split);
+            currentStart += splitSize;
+            remainingBytes -= splitSize;
         }
         return splits;
     }
 
-    private static boolean isTextFileUncompressed(String fileName) {
-        if (fileName == null || fileName.trim().isEmpty()) {
+    private boolean preferToSplitFile(FileStatus file) {
+        if (file.getLen() <= targetSplitSize) {
             return false;
         }
-        String[] parts = fileName.split("\\.");
-        if (parts.length < 2) {
-            return false;
+
+        Options options = coreOptions.toConfiguration();
+        switch (format) {
+            case CSV:
+                return !isCompressed(file.getPath())
+                        && 
isDefaultDelimiter(options.get(CsvOptions.LINE_DELIMITER));
+            case JSON:
+                return !isCompressed(file.getPath())
+                        && 
isDefaultDelimiter(options.get(JsonOptions.LINE_DELIMITER));
+            default:
+                return false;
         }
-        String lastExt = parts[parts.length - 1].toLowerCase();
-        return "csv".equals(lastExt) || "json".equals(lastExt);
     }
 
     public static Map<String, String> 
extractLeadingEqualityPartitionSpecWhenOnlyAnd(
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
index 0a292c4d8b..6ab7734872 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
@@ -69,11 +69,7 @@ public class HadoopCompressionUtils {
                 return inputStream;
             }
 
-            CompressionCodecFactory codecFactory =
-                    new CompressionCodecFactory(new Configuration(false));
-
-            CompressionCodec codec =
-                    codecFactory.getCodec(new 
org.apache.hadoop.fs.Path(filePath.toString()));
+            CompressionCodec codec = getCompressionCodec(filePath);
             if (codec != null) {
                 return codec.createInputStream(inputStream);
             }
@@ -83,6 +79,16 @@ public class HadoopCompressionUtils {
         }
     }
 
+    public static boolean isCompressed(Path filePath) {
+        return getCompressionCodec(filePath) != null;
+    }
+
+    public static CompressionCodec getCompressionCodec(Path filePath) {
+        CompressionCodecFactory codecFactory =
+                new CompressionCodecFactory(new Configuration(false));
+        return codecFactory.getCodec(new 
org.apache.hadoop.fs.Path(filePath.toString()));
+    }
+
     /**
      * Gets a compression codec by compression type.
      *
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
index 50fb00c66b..e65d647ee2 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
@@ -34,18 +34,21 @@ public interface TextLineReader extends Closeable {
     static TextLineReader create(
             InputStream inputStream, String delimiter, long offset, @Nullable 
Long length)
             throws IOException {
-        byte[] delimiterBytes =
-                delimiter != null && !"\n".equals(delimiter)
-                        ? delimiter.getBytes(StandardCharsets.UTF_8)
-                        : null;
-        if (delimiterBytes == null || delimiterBytes.length == 0) {
+        if (isDefaultDelimiter(delimiter)) {
             return new StandardLineReader(inputStream, offset, length);
         } else {
             if (offset != 0 || length != null) {
                 throw new UnsupportedOperationException(
                         "Custom line text file does not support offset and 
length.");
             }
-            return new CustomLineReader(inputStream, delimiterBytes);
+            return new CustomLineReader(inputStream, 
delimiter.getBytes(StandardCharsets.UTF_8));
         }
     }
+
+    static boolean isDefaultDelimiter(String delimiter) {
+        return delimiter == null
+                || "\n".equals(delimiter)
+                || "\r\n".equals(delimiter)
+                || "\r".equals(delimiter);
+    }
 }

Reply via email to