This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d2833d2ca [core] support format table read (#6095)
7d2833d2ca is described below

commit 7d2833d2ca1aa9c6f8fc7e29b19d7d581b0e7eb6
Author: jerry <[email protected]>
AuthorDate: Wed Sep 3 11:50:38 2025 +0800

    [core] support format table read (#6095)
---
 .../apache/paimon/partition/PartitionUtils.java    |  37 ++--
 .../java/org/apache/paimon/table/FormatTable.java  |  58 +++---
 .../paimon/table/format/FormatDataSplit.java       |  90 +++++++++
 .../paimon/table/format/FormatReadBuilder.java     | 205 +++++++++++++++++++++
 .../paimon/table/format/FormatTableRead.java       | 142 ++++++++++++++
 .../paimon/table/format/FormatTableScan.java       | 181 ++++++++++++++++++
 .../apache/paimon/utils/FileStorePathFactory.java  |  34 ++++
 .../apache/paimon/utils/PartitionPathUtils.java    |  61 ++++++
 .../org/apache/paimon/catalog/CatalogTestBase.java | 196 ++++++++++++++++++++
 .../paimon/table/format/FormatDataSplitTest.java   |  65 +++++++
 .../paimon/table/format/FormatTableScanTest.java   |  66 +++++++
 11 files changed, 1102 insertions(+), 33 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java
index 3f8c56467b..bfa1e2cb1d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java
@@ -40,31 +40,46 @@ public class PartitionUtils {
         if (dataSchema.partitionKeys().isEmpty()) {
             return Pair.of(null, dataFields);
         }
+        return getPartitionMapping2fieldsWithoutPartition(
+                dataSchema.partitionKeys(),
+                dataFields,
+                
dataSchema.projectedLogicalRowType(dataSchema.partitionKeys()));
+    }
 
-        List<String> partitionNames = dataSchema.partitionKeys();
-        List<DataField> fieldsWithoutPartition = new ArrayList<>();
+    public static Pair<int[], RowType> getPartitionMapping(
+            List<String> partitionKeys, List<DataField> dataFields, RowType 
partitionType) {
+        return getPartitionMapping2fieldsWithoutPartition(partitionKeys, 
dataFields, partitionType)
+                .getLeft();
+    }
+
+    public static Pair<Pair<int[], RowType>, List<DataField>>
+            getPartitionMapping2fieldsWithoutPartition(
+                    List<String> partitionKeys, List<DataField> dataFields, 
RowType partitionType) {
+        if (partitionKeys.isEmpty()) {
+            return Pair.of(null, dataFields);
+        }
 
+        List<DataField> fieldsWithoutPartition = new ArrayList<>();
         int[] map = new int[dataFields.size() + 1];
-        int pCount = 0;
+        int partitionFieldCount = 0;
+
         for (int i = 0; i < dataFields.size(); i++) {
             DataField field = dataFields.get(i);
-            if (partitionNames.contains(field.name())) {
+            if (partitionKeys.contains(field.name())) {
                 // if the map[i] is minus, represent the related column is 
stored in partition row
-                map[i] = -(partitionNames.indexOf(field.name()) + 1);
-                pCount++;
+                map[i] = -(partitionKeys.indexOf(field.name()) + 1);
+                partitionFieldCount++;
             } else {
                 // else if the map[i] is positive, the related column is 
stored in the file-read row
-                map[i] = (i - pCount) + 1;
-                fieldsWithoutPartition.add(dataFields.get(i));
+                map[i] = (i - partitionFieldCount) + 1;
+                fieldsWithoutPartition.add(field);
             }
         }
 
         Pair<int[], RowType> partitionMapping =
                 fieldsWithoutPartition.size() == dataFields.size()
                         ? null
-                        : Pair.of(
-                                map,
-                                
dataSchema.projectedLogicalRowType(dataSchema.partitionKeys()));
+                        : Pair.of(map, partitionType);
         return Pair.of(partitionMapping, fieldsWithoutPartition);
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index b7ea520e49..d3b6881246 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.format.FormatReadBuilder;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -42,6 +43,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
+
 /**
  * A file format table refers to a directory that contains multiple files of 
the same format, where
  * operations on this table allow for reading or writing to these files, 
facilitating the retrieval
@@ -86,8 +89,8 @@ public interface FormatTable extends Table {
     }
 
     /** Create a new builder for {@link FormatTable}. */
-    static FormatTable.Builder builder() {
-        return new FormatTable.Builder();
+    static Builder builder() {
+        return new Builder();
     }
 
     /** Builder for {@link FormatTable}. */
@@ -98,52 +101,52 @@ public interface FormatTable extends Table {
         private RowType rowType;
         private List<String> partitionKeys;
         private String location;
-        private FormatTable.Format format;
+        private Format format;
         private Map<String, String> options;
         @Nullable private String comment;
 
-        public FormatTable.Builder fileIO(FileIO fileIO) {
+        public Builder fileIO(FileIO fileIO) {
             this.fileIO = fileIO;
             return this;
         }
 
-        public FormatTable.Builder identifier(Identifier identifier) {
+        public Builder identifier(Identifier identifier) {
             this.identifier = identifier;
             return this;
         }
 
-        public FormatTable.Builder rowType(RowType rowType) {
+        public Builder rowType(RowType rowType) {
             this.rowType = rowType;
             return this;
         }
 
-        public FormatTable.Builder partitionKeys(List<String> partitionKeys) {
+        public Builder partitionKeys(List<String> partitionKeys) {
             this.partitionKeys = partitionKeys;
             return this;
         }
 
-        public FormatTable.Builder location(String location) {
+        public Builder location(String location) {
             this.location = location;
             return this;
         }
 
-        public FormatTable.Builder format(FormatTable.Format format) {
+        public Builder format(Format format) {
             this.format = format;
             return this;
         }
 
-        public FormatTable.Builder options(Map<String, String> options) {
+        public Builder options(Map<String, String> options) {
             this.options = options;
             return this;
         }
 
-        public FormatTable.Builder comment(@Nullable String comment) {
+        public Builder comment(@Nullable String comment) {
             this.comment = comment;
             return this;
         }
 
         public FormatTable build() {
-            return new FormatTable.FormatTableImpl(
+            return new FormatTableImpl(
                     fileIO, identifier, rowType, partitionKeys, location, 
format, options, comment);
         }
     }
@@ -151,6 +154,8 @@ public interface FormatTable extends Table {
     /** An implementation for {@link FormatTable}. */
     class FormatTableImpl implements FormatTable {
 
+        private static final long serialVersionUID = 1L;
+
         private final FileIO fileIO;
         private final Identifier identifier;
         private final RowType rowType;
@@ -245,6 +250,25 @@ public interface FormatTable extends Table {
         }
     }
 
+    @Override
+    default ReadBuilder newReadBuilder() {
+        return new FormatReadBuilder(this);
+    }
+
+    @Override
+    default BatchWriteBuilder newBatchWriteBuilder() {
+        throw new UnsupportedOperationException();
+    }
+
+    default RowType partitionType() {
+        return rowType().project(partitionKeys());
+    }
+
+    default String defaultPartName() {
+        return options()
+                .getOrDefault(PARTITION_DEFAULT_NAME.key(), 
PARTITION_DEFAULT_NAME.defaultValue());
+    }
+
     // ===================== Unsupported ===============================
 
     @Override
@@ -352,16 +376,6 @@ public interface FormatTable extends Table {
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    default ReadBuilder newReadBuilder() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    default BatchWriteBuilder newBatchWriteBuilder() {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     default StreamWriteBuilder newStreamWriteBuilder() {
         throw new UnsupportedOperationException();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
new file mode 100644
index 0000000000..86b26a1351
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.Split;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** {@link FormatDataSplit} for format table. */
+public class FormatDataSplit implements Split {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Path filePath;
+    private final long offset;
+    private final long length;
+    @Nullable private final BinaryRow partition;
+
+    public FormatDataSplit(Path filePath, long offset, long length, @Nullable 
BinaryRow partition) {
+        this.filePath = filePath;
+        this.offset = offset;
+        this.length = length;
+        this.partition = partition;
+    }
+
+    public Path filePath() {
+        return this.filePath;
+    }
+
+    public Path dataPath() {
+        return filePath;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public long length() {
+        return length;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    @Override
+    public long rowCount() {
+        return -1;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FormatDataSplit that = (FormatDataSplit) o;
+        return offset == that.offset
+                && length == that.length
+                && Objects.equals(filePath, that.filePath)
+                && Objects.equals(partition, that.partition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(filePath, offset, length, partition);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
new file mode 100644
index 0000000000..f664acaf85
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormatDiscover;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileRecordReader;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.partition.PartitionUtils;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+
+/** {@link ReadBuilder} for {@link FormatTable}. */
+public class FormatReadBuilder implements ReadBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private CoreOptions options;
+    private FormatTable table;
+    private RowType readType;
+
+    private final FileFormatDiscover formatDiscover;
+
+    @Nullable private Predicate filter;
+    @Nullable private PartitionPredicate partitionFilter;
+    @Nullable private Integer limit;
+
+    public FormatReadBuilder(FormatTable table) {
+        this.table = table;
+        this.readType = this.table.rowType();
+        this.options = new CoreOptions(table.options());
+        this.formatDiscover = FileFormatDiscover.of(this.options);
+    }
+
+    @Override
+    public String tableName() {
+        return this.table.name();
+    }
+
+    @Override
+    public RowType readType() {
+        return this.readType;
+    }
+
+    @Override
+    public ReadBuilder withFilter(Predicate predicate) {
+        if (this.filter == null) {
+            this.filter = predicate;
+        } else {
+            this.filter = PredicateBuilder.and(this.filter, predicate);
+        }
+        return this;
+    }
+
+    @Override
+    public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
+        if (partitionSpec != null) {
+            RowType partitionType = 
table.rowType().project(table.partitionKeys());
+            PartitionPredicate partitionPredicate =
+                    fromPredicate(
+                            partitionType,
+                            createPartitionPredicate(
+                                    partitionSpec, partitionType, 
table.defaultPartName()));
+            withPartitionFilter(partitionPredicate);
+        }
+        return this;
+    }
+
+    @Override
+    public ReadBuilder withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+        this.partitionFilter = partitionPredicate;
+        return this;
+    }
+
+    @Override
+    public ReadBuilder withReadType(RowType readType) {
+        this.readType = readType;
+        return this;
+    }
+
+    @Override
+    public ReadBuilder withProjection(int[] projection) {
+        if (projection == null) {
+            return this;
+        }
+        return withReadType(readType().project(projection));
+    }
+
+    @Override
+    public ReadBuilder withLimit(int limit) {
+        this.limit = limit;
+        return this;
+    }
+
+    @Override
+    public TableScan newScan() {
+        return new FormatTableScan(table, partitionFilter, limit);
+    }
+
+    @Override
+    public TableRead newRead() {
+        return new FormatTableRead(readType(), this, filter, limit);
+    }
+
+    protected RecordReader<InternalRow> createReader(FormatDataSplit 
dataSplit) throws IOException {
+        Path filePath = dataSplit.dataPath();
+        FormatReaderContext formatReaderContext =
+                new FormatReaderContext(table.fileIO(), filePath, 
dataSplit.length(), null);
+
+        FormatReaderFactory readerFactory =
+                formatDiscover
+                        .discover(options.formatType())
+                        .createReaderFactory(
+                                table.rowType(), readType(), 
PredicateBuilder.splitAnd(filter));
+
+        Pair<int[], RowType> partitionMapping =
+                PartitionUtils.getPartitionMapping(
+                        table.partitionKeys(), readType().getFields(), 
table.partitionType());
+
+        FileRecordReader<InternalRow> fileRecordReader =
+                new DataFileRecordReader(
+                        readType(),
+                        readerFactory,
+                        formatReaderContext,
+                        null,
+                        null,
+                        PartitionUtils.create(partitionMapping, 
dataSplit.partition()),
+                        false,
+                        null,
+                        0,
+                        Collections.emptyMap());
+        return fileRecordReader;
+    }
+
+    // ===================== Unsupported ===============================
+    @Override
+    public ReadBuilder withTopN(TopN topN) {
+        throw new UnsupportedOperationException("TopN is not supported for 
FormatTable.");
+    }
+
+    @Override
+    public ReadBuilder dropStats() {
+        throw new UnsupportedOperationException("Format Table does not support 
dropStats.");
+    }
+
+    @Override
+    public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
+        throw new UnsupportedOperationException("Format Table does not support 
withBucketFilter.");
+    }
+
+    @Override
+    public ReadBuilder withBucket(int bucket) {
+        throw new UnsupportedOperationException("Format Table does not support 
withBucket.");
+    }
+
+    @Override
+    public ReadBuilder withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        throw new UnsupportedOperationException("Format Table does not support 
withShard.");
+    }
+
+    @Override
+    public StreamTableScan newStreamScan() {
+        throw new UnsupportedOperationException("Format Table does not support 
stream scan.");
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java
new file mode 100644
index 0000000000..2fbe3f5629
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateProjectionConverter;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A {@link TableRead} implementation for {@link FormatTable}. */
+public class FormatTableRead implements TableRead {
+
+    private RowType readType;
+    private boolean executeFilter = false;
+    private Predicate predicate;
+    private FormatReadBuilder read;
+    private Integer limit;
+
+    public FormatTableRead(
+            RowType readType, FormatReadBuilder read, Predicate predicate, 
Integer limit) {
+        this.readType = readType;
+        this.read = read;
+        this.predicate = predicate;
+        this.limit = limit;
+    }
+
+    @Override
+    public TableRead withMetricRegistry(MetricRegistry registry) {
+        return this;
+    }
+
+    @Override
+    public TableRead executeFilter() {
+        this.executeFilter = true;
+        return this;
+    }
+
+    @Override
+    public TableRead withIOManager(IOManager ioManager) {
+        return this;
+    }
+
+    @Override
+    public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        FormatDataSplit dataSplit = (FormatDataSplit) split;
+        RecordReader<InternalRow> reader = read.createReader(dataSplit);
+        if (executeFilter) {
+            reader = executeFilter(reader);
+        }
+        if (limit != null && limit > 0) {
+            reader = applyLimit(reader, limit);
+        }
+
+        return reader;
+    }
+
+    private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow> 
reader) {
+        if (predicate == null) {
+            return reader;
+        }
+
+        Predicate predicate = this.predicate;
+        if (readType != null) {
+            int[] projection = 
readType.getFieldIndices(readType.getFieldNames());
+            Optional<Predicate> optional =
+                    predicate.visit(new 
PredicateProjectionConverter(projection));
+            if (!optional.isPresent()) {
+                return reader;
+            }
+            predicate = optional.get();
+        }
+
+        Predicate finalFilter = predicate;
+        return reader.filter(finalFilter::test);
+    }
+
+    private RecordReader<InternalRow> applyLimit(RecordReader<InternalRow> 
reader, int limit) {
+        return new RecordReader<InternalRow>() {
+            private final AtomicLong recordCount = new AtomicLong(0);
+
+            @Override
+            public RecordIterator<InternalRow> readBatch() throws IOException {
+                if (recordCount.get() >= limit) {
+                    return null;
+                }
+                RecordIterator<InternalRow> iterator = reader.readBatch();
+                if (iterator == null) {
+                    return null;
+                }
+                return new RecordIterator<InternalRow>() {
+                    @Override
+                    public InternalRow next() throws IOException {
+                        if (recordCount.get() >= limit) {
+                            return null;
+                        }
+                        InternalRow next = iterator.next();
+                        if (next != null) {
+                            recordCount.incrementAndGet();
+                        }
+                        return next;
+                    }
+
+                    @Override
+                    public void releaseBatch() {
+                        iterator.releaseBatch();
+                    }
+                };
+            }
+
+            @Override
+            public void close() throws IOException {
+                reader.close();
+            }
+        };
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
new file mode 100644
index 0000000000..3555f76836
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PartitionPathUtils;
+import org.apache.paimon.utils.TypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/** {@link TableScan} for {@link FormatTable}. */
+public class FormatTableScan implements InnerTableScan {
+
+    private final FormatTable table;
+    @Nullable private PartitionPredicate partitionFilter;
+    @Nullable private Integer limit;
+
+    public FormatTableScan(
+            FormatTable table,
+            @Nullable PartitionPredicate partitionFilter,
+            @Nullable Integer limit) {
+        this.table = table;
+        this.partitionFilter = partitionFilter;
+        this.limit = limit;
+    }
+
+    @Override
+    public InnerTableScan withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+        this.partitionFilter = partitionPredicate;
+        return this;
+    }
+
+    @Override
+    public Plan plan() {
+        return new FormatTableScanPlan();
+    }
+
+    @Override
+    public List<PartitionEntry> listPartitionEntries() {
+        List<Pair<LinkedHashMap<String, String>, Path>> partition2Paths =
+                PartitionPathUtils.searchPartSpecAndPaths(
+                        table.fileIO(), new Path(table.location()), 
table.partitionKeys().size());
+        List<PartitionEntry> partitionEntries = new ArrayList<>();
+        for (Pair<LinkedHashMap<String, String>, Path> partition2Path : 
partition2Paths) {
+            BinaryRow row = createPartitionRow(partition2Path.getKey());
+            partitionEntries.add(new PartitionEntry(row, -1L, -1L, -1L, -1L));
+        }
+        return partitionEntries;
+    }
+
+    @Override
+    public InnerTableScan withFilter(Predicate predicate) {
+        throw new UnsupportedOperationException("Filter is not supported for 
FormatTable.");
+    }
+
+    public static boolean isDataFileName(String fileName) {
+        return fileName != null && !fileName.startsWith(".") && 
!fileName.startsWith("_");
+    }
+
+    private BinaryRow createPartitionRow(LinkedHashMap<String, String> 
partitionSpec) {
+        RowType partitionRowType = table.partitionType();
+        List<DataField> fields = partitionRowType.getFields();
+
+        // Create value setters for each field type
+        List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
+        for (DataField field : fields) {
+            valueSetters.add(BinaryWriter.createValueSetter(field.type()));
+        }
+
+        // Create binary row to hold partition values
+        BinaryRow binaryRow = new BinaryRow(fields.size());
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+        // Fill in partition values
+        for (int i = 0; i < fields.size(); i++) {
+            String fieldName = fields.get(i).name();
+            String partitionValue = partitionSpec.get(fieldName);
+
+            if (partitionValue == null || 
partitionValue.equals(table.defaultPartName())) {
+                // Set null for default partition name or missing partition
+                binaryRowWriter.setNullAt(i);
+            } else {
+                // Convert string value to appropriate type and set it
+                Object value = TypeUtils.castFromString(partitionValue, 
fields.get(i).type());
+                valueSetters.get(i).setValue(binaryRowWriter, i, value);
+            }
+        }
+
+        binaryRowWriter.complete();
+        return binaryRow;
+    }
+
+    private class FormatTableScanPlan implements Plan {
+        @Override
+        public List<Split> splits() {
+            List<Split> splits = new ArrayList<>();
+            try {
+                FileIO fileIO = table.fileIO();
+                if (!table.partitionKeys().isEmpty()) {
+                    List<Pair<LinkedHashMap<String, String>, Path>> 
partition2Paths =
+                            PartitionPathUtils.searchPartSpecAndPaths(
+                                    fileIO,
+                                    new Path(table.location()),
+                                    table.partitionKeys().size());
+                    for (Pair<LinkedHashMap<String, String>, Path> 
partition2Path :
+                            partition2Paths) {
+                        LinkedHashMap<String, String> partitionSpec = 
partition2Path.getKey();
+                        BinaryRow partitionRow = 
createPartitionRow(partitionSpec);
+                        if (partitionFilter != null && 
partitionFilter.test(partitionRow)) {
+                            splits.addAll(
+                                    getSplits(fileIO, 
partition2Path.getValue(), partitionRow));
+                        }
+                    }
+                } else {
+                    splits.addAll(getSplits(fileIO, new 
Path(table.location()), null));
+                }
+                if (limit != null) {
+                    if (limit <= 0) {
+                        return new ArrayList<>();
+                    }
+                    if (splits.size() > limit) {
+                        return splits.subList(0, limit);
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to scan files", e);
+            }
+            return splits;
+        }
+    }
+
+    private List<Split> getSplits(FileIO fileIO, Path path, BinaryRow 
partition)
+            throws IOException {
+        List<Split> splits = new ArrayList<>();
+        FileStatus[] files = fileIO.listFiles(path, true);
+        for (FileStatus file : files) {
+            if (isDataFileName(file.getPath().getName())) {
+                FormatDataSplit split =
+                        new FormatDataSplit(file.getPath(), 0, file.getLen(), 
partition);
+                splits.add(split);
+            }
+        }
+        return splits;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 32135518ba..cb718fc73a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -173,6 +173,40 @@ public class FileStorePathFactory {
                 createExternalPathProvider(partition, bucket));
     }
 
+    public DataFilePathFactory createFormatTableDataFilePathFactory(BinaryRow 
partition) {
+        return new DataFilePathFactory(
+                partitionPath(partition),
+                formatIdentifier,
+                dataFilePrefix,
+                changelogFilePrefix,
+                fileSuffixIncludeCompression,
+                fileCompression,
+                createExternalPartitionPathProvider(partition));
+    }
+
+    private ExternalPathProvider createExternalPartitionPathProvider(BinaryRow 
partition) {
+        if (externalPaths == null || externalPaths.isEmpty()) {
+            return null;
+        }
+
+        return new ExternalPathProvider(externalPaths, 
partitionPath(partition));
+    }
+
+    public Path partitionPath(BinaryRow partition) {
+        Path relativeBucketPath = null;
+        String partitionPath = getPartitionString(partition);
+        if (!partitionPath.isEmpty()) {
+            relativeBucketPath = new Path(partitionPath);
+        }
+        if (dataFilePathDirectory != null) {
+            relativeBucketPath =
+                    relativeBucketPath != null
+                            ? new Path(dataFilePathDirectory, 
relativeBucketPath)
+                            : new Path(dataFilePathDirectory);
+        }
+        return relativeBucketPath != null ? new Path(root, relativeBucketPath) 
: root;
+    }
+
     @Nullable
     private ExternalPathProvider createExternalPathProvider(BinaryRow 
partition, int bucket) {
         if (externalPaths == null || externalPaths.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index ff4084f9d4..26aa0d77a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.LinkedHashMap;
@@ -235,4 +238,62 @@ public class PartitionPathUtils {
 
         return fullPartSpec;
     }
+
+    /**
+     * Search all partitions in this path.
+     *
+     * @param path search path.
+     * @param partitionNumber partition number, it will affect path structure.
+     * @return all partition specs to its path.
+     */
+    public static List<Pair<LinkedHashMap<String, String>, Path>> 
searchPartSpecAndPaths(
+            FileIO fileIO, Path path, int partitionNumber) {
+        FileStatus[] generatedParts = getFileStatusRecurse(path, 
partitionNumber, fileIO);
+        List<Pair<LinkedHashMap<String, String>, Path>> ret = new 
ArrayList<>();
+        for (FileStatus part : generatedParts) {
+            // ignore hidden file
+            if (isHiddenFile(part)) {
+                continue;
+            }
+            ret.add(Pair.of(extractPartitionSpecFromPath(part.getPath()), 
part.getPath()));
+        }
+        return ret;
+    }
+
+    private static FileStatus[] getFileStatusRecurse(Path path, int 
expectLevel, FileIO fileIO) {
+        ArrayList<FileStatus> result = new ArrayList<>();
+
+        try {
+            FileStatus fileStatus = fileIO.getFileStatus(path);
+            listStatusRecursively(fileIO, fileStatus, 0, expectLevel, result);
+        } catch (IOException ignore) {
+            return new FileStatus[0];
+        }
+
+        return result.toArray(new FileStatus[0]);
+    }
+
+    private static void listStatusRecursively(
+            FileIO fileIO,
+            FileStatus fileStatus,
+            int level,
+            int expectLevel,
+            List<FileStatus> results)
+            throws IOException {
+        if (expectLevel == level) {
+            results.add(fileStatus);
+            return;
+        }
+
+        if (fileStatus.isDir()) {
+            for (FileStatus stat : fileIO.listStatus(fileStatus.getPath())) {
+                listStatusRecursively(fileIO, stat, level + 1, expectLevel, 
results);
+            }
+        }
+    }
+
+    private static boolean isHiddenFile(FileStatus fileStatus) {
+        String name = fileStatus.getPath().getName();
+        return name.startsWith("_") || name.startsWith(".");
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 2e525a515f..192e8f8171 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -23,11 +23,28 @@ import org.apache.paimon.PagedList;
 import org.apache.paimon.TableType;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.HadoopCompressionType;
+import org.apache.paimon.format.SupportsDirectWrite;
+import org.apache.paimon.format.csv.CsvFileFormatFactory;
+import org.apache.paimon.format.json.JsonFileFormatFactory;
+import org.apache.paimon.format.parquet.ParquetFileFormatFactory;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
@@ -37,11 +54,14 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.system.AllTableOptionsTable;
 import org.apache.paimon.table.system.CatalogOptionsTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
 
@@ -54,13 +74,19 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
@@ -555,6 +581,176 @@ public abstract class CatalogTestBase {
                 .isInstanceOf(RuntimeException.class);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testFormatTableRead(boolean partitioned) throws Exception {
+        if (!supportsFormatTable()) {
+            return;
+        }
+        Random random = new Random();
+        String dbName = "test_db";
+        catalog.createDatabase(dbName, true);
+        int partitionValue = 10;
+        HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f1", DataTypes.INT());
+        schemaBuilder.column("f2", DataTypes.INT());
+        schemaBuilder.column("dt", DataTypes.INT());
+        schemaBuilder.option("type", "format-table");
+        schemaBuilder.option("target-file-size", "1 kb");
+        schemaBuilder.option("file.compression", compressionType.value());
+        String[] formats = {"csv", "parquet", "json"};
+        for (String format : formats) {
+            if (partitioned) {
+                schemaBuilder.partitionKeys("dt");
+            }
+            Identifier identifier = Identifier.create(dbName, "table_" + 
format);
+            schemaBuilder.option("file.format", format);
+            catalog.createTable(identifier, schemaBuilder.build(), true);
+            FormatTable table = (FormatTable) catalog.getTable(identifier);
+            int[] projection = new int[] {1, 2};
+            PredicateBuilder builder = new 
PredicateBuilder(table.rowType().project(projection));
+            Predicate predicate = builder.greaterOrEqual(0, 10);
+            int size = 5;
+            int checkSize = 3;
+            InternalRow[] datas = new InternalRow[size];
+            InternalRow[] checkDatas = new InternalRow[checkSize];
+            for (int j = 0; j < size; j++) {
+                int f1 = random.nextInt();
+                int f2 = j < checkSize ? random.nextInt(10) + 10 : 
random.nextInt(10);
+                datas[j] = GenericRow.of(f1, f2, partitionValue);
+                if (j < checkSize) {
+                    checkDatas[j] = GenericRow.of(f2, partitionValue);
+                }
+            }
+            InternalRow dataWithDiffPartition =
+                    GenericRow.of(random.nextInt(), random.nextInt(), 11);
+            FormatWriterFactory factory =
+                    (buildFileFormatFactory(format)
+                                    .create(
+                                            new 
FileFormatFactory.FormatContext(
+                                                    new Options(), 1024, 
1024)))
+                            .createWriterFactory(table.rowType());
+            Map<String, String> partitionSpec = null;
+            if (partitioned) {
+                Path partitionPath =
+                        new Path(String.format("%s/%s", table.location(), 
"dt=" + partitionValue));
+                DataFilePathFactory dataFilePathFactory =
+                        new DataFilePathFactory(
+                                partitionPath,
+                                format,
+                                "data",
+                                "change",
+                                true,
+                                compressionType.value(),
+                                null);
+                Path diffPartitionPath =
+                        new Path(String.format("%s/%s", table.location(), 
"dt=" + 11));
+                DataFilePathFactory diffPartitionPathFactory =
+                        new DataFilePathFactory(
+                                diffPartitionPath,
+                                format,
+                                "data",
+                                "change",
+                                true,
+                                compressionType.value(),
+                                null);
+                write(factory, dataFilePathFactory.newPath(), 
compressionType.value(), datas);
+                write(
+                        factory,
+                        diffPartitionPathFactory.newPath(),
+                        compressionType.value(),
+                        dataWithDiffPartition);
+                partitionSpec = new HashMap<>();
+                partitionSpec.put("dt", "" + partitionValue);
+            } else {
+                DataFilePathFactory dataFilePathFactory =
+                        new DataFilePathFactory(
+                                new Path(table.location()),
+                                format,
+                                "data",
+                                "change",
+                                true,
+                                compressionType.value(),
+                                null);
+                write(factory, dataFilePathFactory.newPath(), 
compressionType.value(), datas);
+            }
+            List<InternalRow> readData = read(table, predicate, projection, 
partitionSpec, null);
+            Integer limit = checkSize - 1;
+            List<InternalRow> readLimitData =
+                    read(table, predicate, projection, partitionSpec, limit);
+            assertThat(readLimitData).hasSize(limit);
+            assertThat(readData).containsExactlyInAnyOrder(checkDatas);
+            catalog.dropTable(Identifier.create(dbName, format), true);
+        }
+    }
+
+    protected FileFormatFactory buildFileFormatFactory(String format) {
+        switch (format) {
+            case "csv":
+                return new CsvFileFormatFactory();
+            case "parquet":
+                return new ParquetFileFormatFactory();
+            case "json":
+                return new JsonFileFormatFactory();
+            default:
+                throw new IllegalArgumentException("Unsupported format: " + 
format);
+        }
+    }
+
+    protected void write(
+            FormatWriterFactory factory, Path file, String compression, 
InternalRow... rows)
+            throws IOException {
+        FormatWriter writer;
+        PositionOutputStream out = null;
+        if (factory instanceof SupportsDirectWrite) {
+            writer = ((SupportsDirectWrite) factory).create(fileIO, file, 
compression);
+        } else {
+            out = fileIO.newOutputStream(file, true);
+            writer = factory.create(out, compression);
+        }
+        for (InternalRow row : rows) {
+            writer.addElement(row);
+        }
+        writer.close();
+        if (out != null) {
+            out.close();
+        }
+    }
+
+    @SafeVarargs
+    protected final List<InternalRow> read(
+            Table table,
+            Predicate predicate,
+            @Nullable int[] projection,
+            @Nullable Map<String, String> partitionSpec,
+            @Nullable Integer limit,
+            Pair<ConfigOption<?>, String>... dynamicOptions)
+            throws Exception {
+        Map<String, String> options = new HashMap<>();
+        for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
+            options.put(pair.getKey().key(), pair.getValue());
+        }
+        table = table.copy(options);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        if (projection != null) {
+            readBuilder.withProjection(projection);
+        }
+        readBuilder.withPartitionFilter(partitionSpec);
+        TableScan scan = readBuilder.newScan();
+        readBuilder.withFilter(predicate);
+        if (limit != null) {
+            readBuilder.withLimit(limit);
+        }
+        try (RecordReader<InternalRow> reader =
+                
readBuilder.newRead().executeFilter().createReader(scan.plan())) {
+            InternalRowSerializer serializer = new 
InternalRowSerializer(readBuilder.readType());
+            List<InternalRow> rows = new ArrayList<>();
+            reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+            return rows;
+        }
+    }
+
     @Test
     public void testGetTable() throws Exception {
         catalog.createDatabase("test_db", false);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
new file mode 100644
index 0000000000..d992796875
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FormatDataSplit}. */
+public class FormatDataSplitTest {
+
+    @Test
+    public void testSerializeAndDeserialize() throws IOException, 
ClassNotFoundException {
+        // Create test data
+        Path filePath = new Path("/test/path/file.parquet");
+        RowType rowType = RowType.builder().field("id", new IntType()).build();
+        long modificationTime = System.currentTimeMillis();
+
+        // Create a predicate for testing
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = builder.equal(0, 5);
+
+        // Create FormatDataSplit
+        FormatDataSplit split =
+                new FormatDataSplit(
+                        filePath, 100L, // offset
+                        1024L, // length
+                        null);
+
+        // Test Java serialization
+        byte[] serialized = InstantiationUtil.serializeObject(split);
+        FormatDataSplit deserialized =
+                InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        // Verify the deserialized object
+        assertThat(deserialized.filePath()).isEqualTo(filePath);
+        assertThat(deserialized.offset()).isEqualTo(100L);
+        assertThat(deserialized.length()).isEqualTo(1024L);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
new file mode 100644
index 0000000000..326a9138f6
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link FormatTableScan}. */
+class FormatTableScanTest {
+
+    @ParameterizedTest
+    @ValueSource(
+            strings = {
+                "File.txt",
+                "file.txt",
+                "123file.txt",
+                "F",
+                "File-1.txt",
+                "a",
+                "0",
+                "9test",
+                "Test_file.log"
+            })
+    @DisplayName("Test valid filenames that should return true")
+    void testValidDataFileNames(String fileName) {
+        assertTrue(
+                FormatTableScan.isDataFileName(fileName),
+                "Filename '" + fileName + "' should be valid");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {".hidden", "_file.txt"})
+    @DisplayName("Test invalid filenames that should return false")
+    void testInvalidDataFileNames(String fileName) {
+        assertFalse(
+                FormatTableScan.isDataFileName(fileName),
+                "Filename '" + fileName + "' should be invalid");
+    }
+
+    @Test
+    @DisplayName("Test null input should return false")
+    void testNullInput() {
+        assertFalse(FormatTableScan.isDataFileName(null), "Null input should 
return false");
+    }
+}

Reply via email to