This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7d2833d2ca [core] support format table read (#6095)
7d2833d2ca is described below
commit 7d2833d2ca1aa9c6f8fc7e29b19d7d581b0e7eb6
Author: jerry <[email protected]>
AuthorDate: Wed Sep 3 11:50:38 2025 +0800
[core] support format table read (#6095)
---
.../apache/paimon/partition/PartitionUtils.java | 37 ++--
.../java/org/apache/paimon/table/FormatTable.java | 58 +++---
.../paimon/table/format/FormatDataSplit.java | 90 +++++++++
.../paimon/table/format/FormatReadBuilder.java | 205 +++++++++++++++++++++
.../paimon/table/format/FormatTableRead.java | 142 ++++++++++++++
.../paimon/table/format/FormatTableScan.java | 181 ++++++++++++++++++
.../apache/paimon/utils/FileStorePathFactory.java | 34 ++++
.../apache/paimon/utils/PartitionPathUtils.java | 61 ++++++
.../org/apache/paimon/catalog/CatalogTestBase.java | 196 ++++++++++++++++++++
.../paimon/table/format/FormatDataSplitTest.java | 65 +++++++
.../paimon/table/format/FormatTableScanTest.java | 66 +++++++
11 files changed, 1102 insertions(+), 33 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java
index 3f8c56467b..bfa1e2cb1d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionUtils.java
@@ -40,31 +40,46 @@ public class PartitionUtils {
if (dataSchema.partitionKeys().isEmpty()) {
return Pair.of(null, dataFields);
}
+ return getPartitionMapping2fieldsWithoutPartition(
+ dataSchema.partitionKeys(),
+ dataFields,
+
dataSchema.projectedLogicalRowType(dataSchema.partitionKeys()));
+ }
- List<String> partitionNames = dataSchema.partitionKeys();
- List<DataField> fieldsWithoutPartition = new ArrayList<>();
+ public static Pair<int[], RowType> getPartitionMapping(
+ List<String> partitionKeys, List<DataField> dataFields, RowType
partitionType) {
+ return getPartitionMapping2fieldsWithoutPartition(partitionKeys,
dataFields, partitionType)
+ .getLeft();
+ }
+
+ public static Pair<Pair<int[], RowType>, List<DataField>>
+ getPartitionMapping2fieldsWithoutPartition(
+ List<String> partitionKeys, List<DataField> dataFields,
RowType partitionType) {
+ if (partitionKeys.isEmpty()) {
+ return Pair.of(null, dataFields);
+ }
+ List<DataField> fieldsWithoutPartition = new ArrayList<>();
int[] map = new int[dataFields.size() + 1];
- int pCount = 0;
+ int partitionFieldCount = 0;
+
for (int i = 0; i < dataFields.size(); i++) {
DataField field = dataFields.get(i);
- if (partitionNames.contains(field.name())) {
+ if (partitionKeys.contains(field.name())) {
// if the map[i] is minus, represent the related column is
stored in partition row
- map[i] = -(partitionNames.indexOf(field.name()) + 1);
- pCount++;
+ map[i] = -(partitionKeys.indexOf(field.name()) + 1);
+ partitionFieldCount++;
} else {
// else if the map[i] is positive, the related column is
stored in the file-read row
- map[i] = (i - pCount) + 1;
- fieldsWithoutPartition.add(dataFields.get(i));
+ map[i] = (i - partitionFieldCount) + 1;
+ fieldsWithoutPartition.add(field);
}
}
Pair<int[], RowType> partitionMapping =
fieldsWithoutPartition.size() == dataFields.size()
? null
- : Pair.of(
- map,
-
dataSchema.projectedLogicalRowType(dataSchema.partitionKeys()));
+ : Pair.of(map, partitionType);
return Pair.of(partitionMapping, fieldsWithoutPartition);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index b7ea520e49..d3b6881246 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.format.FormatReadBuilder;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
@@ -42,6 +43,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
+
/**
* A file format table refers to a directory that contains multiple files of
the same format, where
* operations on this table allow for reading or writing to these files,
facilitating the retrieval
@@ -86,8 +89,8 @@ public interface FormatTable extends Table {
}
/** Create a new builder for {@link FormatTable}. */
- static FormatTable.Builder builder() {
- return new FormatTable.Builder();
+ static Builder builder() {
+ return new Builder();
}
/** Builder for {@link FormatTable}. */
@@ -98,52 +101,52 @@ public interface FormatTable extends Table {
private RowType rowType;
private List<String> partitionKeys;
private String location;
- private FormatTable.Format format;
+ private Format format;
private Map<String, String> options;
@Nullable private String comment;
- public FormatTable.Builder fileIO(FileIO fileIO) {
+ public Builder fileIO(FileIO fileIO) {
this.fileIO = fileIO;
return this;
}
- public FormatTable.Builder identifier(Identifier identifier) {
+ public Builder identifier(Identifier identifier) {
this.identifier = identifier;
return this;
}
- public FormatTable.Builder rowType(RowType rowType) {
+ public Builder rowType(RowType rowType) {
this.rowType = rowType;
return this;
}
- public FormatTable.Builder partitionKeys(List<String> partitionKeys) {
+ public Builder partitionKeys(List<String> partitionKeys) {
this.partitionKeys = partitionKeys;
return this;
}
- public FormatTable.Builder location(String location) {
+ public Builder location(String location) {
this.location = location;
return this;
}
- public FormatTable.Builder format(FormatTable.Format format) {
+ public Builder format(Format format) {
this.format = format;
return this;
}
- public FormatTable.Builder options(Map<String, String> options) {
+ public Builder options(Map<String, String> options) {
this.options = options;
return this;
}
- public FormatTable.Builder comment(@Nullable String comment) {
+ public Builder comment(@Nullable String comment) {
this.comment = comment;
return this;
}
public FormatTable build() {
- return new FormatTable.FormatTableImpl(
+ return new FormatTableImpl(
fileIO, identifier, rowType, partitionKeys, location,
format, options, comment);
}
}
@@ -151,6 +154,8 @@ public interface FormatTable extends Table {
/** An implementation for {@link FormatTable}. */
class FormatTableImpl implements FormatTable {
+ private static final long serialVersionUID = 1L;
+
private final FileIO fileIO;
private final Identifier identifier;
private final RowType rowType;
@@ -245,6 +250,25 @@ public interface FormatTable extends Table {
}
}
+ @Override
+ default ReadBuilder newReadBuilder() {
+ return new FormatReadBuilder(this);
+ }
+
+ @Override
+ default BatchWriteBuilder newBatchWriteBuilder() {
+ throw new UnsupportedOperationException();
+ }
+
+ default RowType partitionType() {
+ return rowType().project(partitionKeys());
+ }
+
+ default String defaultPartName() {
+ return options()
+ .getOrDefault(PARTITION_DEFAULT_NAME.key(),
PARTITION_DEFAULT_NAME.defaultValue());
+ }
+
// ===================== Unsupported ===============================
@Override
@@ -352,16 +376,6 @@ public interface FormatTable extends Table {
throw new UnsupportedOperationException();
}
- @Override
- default ReadBuilder newReadBuilder() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- default BatchWriteBuilder newBatchWriteBuilder() {
- throw new UnsupportedOperationException();
- }
-
@Override
default StreamWriteBuilder newStreamWriteBuilder() {
throw new UnsupportedOperationException();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
new file mode 100644
index 0000000000..86b26a1351
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.Split;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** {@link FormatDataSplit} for format table. */
+public class FormatDataSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Path filePath;
+ private final long offset;
+ private final long length;
+ @Nullable private final BinaryRow partition;
+
+ public FormatDataSplit(Path filePath, long offset, long length, @Nullable
BinaryRow partition) {
+ this.filePath = filePath;
+ this.offset = offset;
+ this.length = length;
+ this.partition = partition;
+ }
+
+ public Path filePath() {
+ return this.filePath;
+ }
+
+ public Path dataPath() {
+ return filePath;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public long length() {
+ return length;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ @Override
+ public long rowCount() {
+ return -1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FormatDataSplit that = (FormatDataSplit) o;
+ return offset == that.offset
+ && length == that.length
+ && Objects.equals(filePath, that.filePath)
+ && Objects.equals(partition, that.partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(filePath, offset, length, partition);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
new file mode 100644
index 0000000000..f664acaf85
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormatDiscover;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileRecordReader;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.partition.PartitionUtils;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+
+/** {@link ReadBuilder} for {@link FormatTable}. */
+public class FormatReadBuilder implements ReadBuilder {
+
+ private static final long serialVersionUID = 1L;
+
+ private CoreOptions options;
+ private FormatTable table;
+ private RowType readType;
+
+ private final FileFormatDiscover formatDiscover;
+
+ @Nullable private Predicate filter;
+ @Nullable private PartitionPredicate partitionFilter;
+ @Nullable private Integer limit;
+
+ public FormatReadBuilder(FormatTable table) {
+ this.table = table;
+ this.readType = this.table.rowType();
+ this.options = new CoreOptions(table.options());
+ this.formatDiscover = FileFormatDiscover.of(this.options);
+ }
+
+ @Override
+ public String tableName() {
+ return this.table.name();
+ }
+
+ @Override
+ public RowType readType() {
+ return this.readType;
+ }
+
+ @Override
+ public ReadBuilder withFilter(Predicate predicate) {
+ if (this.filter == null) {
+ this.filter = predicate;
+ } else {
+ this.filter = PredicateBuilder.and(this.filter, predicate);
+ }
+ return this;
+ }
+
+ @Override
+ public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
+ if (partitionSpec != null) {
+ RowType partitionType =
table.rowType().project(table.partitionKeys());
+ PartitionPredicate partitionPredicate =
+ fromPredicate(
+ partitionType,
+ createPartitionPredicate(
+ partitionSpec, partitionType,
table.defaultPartName()));
+ withPartitionFilter(partitionPredicate);
+ }
+ return this;
+ }
+
+ @Override
+ public ReadBuilder withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ this.partitionFilter = partitionPredicate;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder withReadType(RowType readType) {
+ this.readType = readType;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder withProjection(int[] projection) {
+ if (projection == null) {
+ return this;
+ }
+ return withReadType(readType().project(projection));
+ }
+
+ @Override
+ public ReadBuilder withLimit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new FormatTableScan(table, partitionFilter, limit);
+ }
+
+ @Override
+ public TableRead newRead() {
+ return new FormatTableRead(readType(), this, filter, limit);
+ }
+
+ protected RecordReader<InternalRow> createReader(FormatDataSplit
dataSplit) throws IOException {
+ Path filePath = dataSplit.dataPath();
+ FormatReaderContext formatReaderContext =
+ new FormatReaderContext(table.fileIO(), filePath,
dataSplit.length(), null);
+
+ FormatReaderFactory readerFactory =
+ formatDiscover
+ .discover(options.formatType())
+ .createReaderFactory(
+ table.rowType(), readType(),
PredicateBuilder.splitAnd(filter));
+
+ Pair<int[], RowType> partitionMapping =
+ PartitionUtils.getPartitionMapping(
+ table.partitionKeys(), readType().getFields(),
table.partitionType());
+
+ FileRecordReader<InternalRow> fileRecordReader =
+ new DataFileRecordReader(
+ readType(),
+ readerFactory,
+ formatReaderContext,
+ null,
+ null,
+ PartitionUtils.create(partitionMapping,
dataSplit.partition()),
+ false,
+ null,
+ 0,
+ Collections.emptyMap());
+ return fileRecordReader;
+ }
+
+ // ===================== Unsupported ===============================
+ @Override
+ public ReadBuilder withTopN(TopN topN) {
+ throw new UnsupportedOperationException("TopN is not supported for
FormatTable.");
+ }
+
+ @Override
+ public ReadBuilder dropStats() {
+ throw new UnsupportedOperationException("Format Table does not support
dropStats.");
+ }
+
+ @Override
+ public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
+ throw new UnsupportedOperationException("Format Table does not support
withBucketFilter.");
+ }
+
+ @Override
+ public ReadBuilder withBucket(int bucket) {
+ throw new UnsupportedOperationException("Format Table does not support
withBucket.");
+ }
+
+ @Override
+ public ReadBuilder withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ throw new UnsupportedOperationException("Format Table does not support
withShard.");
+ }
+
+ @Override
+ public StreamTableScan newStreamScan() {
+ throw new UnsupportedOperationException("Format Table does not support
stream scan.");
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java
new file mode 100644
index 0000000000..2fbe3f5629
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateProjectionConverter;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A {@link TableRead} implementation for {@link FormatTable}. */
+public class FormatTableRead implements TableRead {
+
+ private RowType readType;
+ private boolean executeFilter = false;
+ private Predicate predicate;
+ private FormatReadBuilder read;
+ private Integer limit;
+
+ public FormatTableRead(
+ RowType readType, FormatReadBuilder read, Predicate predicate,
Integer limit) {
+ this.readType = readType;
+ this.read = read;
+ this.predicate = predicate;
+ this.limit = limit;
+ }
+
+ @Override
+ public TableRead withMetricRegistry(MetricRegistry registry) {
+ return this;
+ }
+
+ @Override
+ public TableRead executeFilter() {
+ this.executeFilter = true;
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ FormatDataSplit dataSplit = (FormatDataSplit) split;
+ RecordReader<InternalRow> reader = read.createReader(dataSplit);
+ if (executeFilter) {
+ reader = executeFilter(reader);
+ }
+ if (limit != null && limit > 0) {
+ reader = applyLimit(reader, limit);
+ }
+
+ return reader;
+ }
+
+ private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow>
reader) {
+ if (predicate == null) {
+ return reader;
+ }
+
+ Predicate predicate = this.predicate;
+ if (readType != null) {
+ int[] projection =
readType.getFieldIndices(readType.getFieldNames());
+ Optional<Predicate> optional =
+ predicate.visit(new
PredicateProjectionConverter(projection));
+ if (!optional.isPresent()) {
+ return reader;
+ }
+ predicate = optional.get();
+ }
+
+ Predicate finalFilter = predicate;
+ return reader.filter(finalFilter::test);
+ }
+
+ private RecordReader<InternalRow> applyLimit(RecordReader<InternalRow>
reader, int limit) {
+ return new RecordReader<InternalRow>() {
+ private final AtomicLong recordCount = new AtomicLong(0);
+
+ @Override
+ public RecordIterator<InternalRow> readBatch() throws IOException {
+ if (recordCount.get() >= limit) {
+ return null;
+ }
+ RecordIterator<InternalRow> iterator = reader.readBatch();
+ if (iterator == null) {
+ return null;
+ }
+ return new RecordIterator<InternalRow>() {
+ @Override
+ public InternalRow next() throws IOException {
+ if (recordCount.get() >= limit) {
+ return null;
+ }
+ InternalRow next = iterator.next();
+ if (next != null) {
+ recordCount.incrementAndGet();
+ }
+ return next;
+ }
+
+ @Override
+ public void releaseBatch() {
+ iterator.releaseBatch();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ };
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
new file mode 100644
index 0000000000..3555f76836
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PartitionPathUtils;
+import org.apache.paimon.utils.TypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/** {@link TableScan} for {@link FormatTable}. */
+public class FormatTableScan implements InnerTableScan {
+
+ private final FormatTable table;
+ @Nullable private PartitionPredicate partitionFilter;
+ @Nullable private Integer limit;
+
+ public FormatTableScan(
+ FormatTable table,
+ @Nullable PartitionPredicate partitionFilter,
+ @Nullable Integer limit) {
+ this.table = table;
+ this.partitionFilter = partitionFilter;
+ this.limit = limit;
+ }
+
+ @Override
+ public InnerTableScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ this.partitionFilter = partitionPredicate;
+ return this;
+ }
+
+ @Override
+ public Plan plan() {
+ return new FormatTableScanPlan();
+ }
+
+ @Override
+ public List<PartitionEntry> listPartitionEntries() {
+ List<Pair<LinkedHashMap<String, String>, Path>> partition2Paths =
+ PartitionPathUtils.searchPartSpecAndPaths(
+ table.fileIO(), new Path(table.location()),
table.partitionKeys().size());
+ List<PartitionEntry> partitionEntries = new ArrayList<>();
+ for (Pair<LinkedHashMap<String, String>, Path> partition2Path :
partition2Paths) {
+ BinaryRow row = createPartitionRow(partition2Path.getKey());
+ partitionEntries.add(new PartitionEntry(row, -1L, -1L, -1L, -1L));
+ }
+ return partitionEntries;
+ }
+
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ throw new UnsupportedOperationException("Filter is not supported for
FormatTable.");
+ }
+
+ public static boolean isDataFileName(String fileName) {
+ return fileName != null && !fileName.startsWith(".") &&
!fileName.startsWith("_");
+ }
+
+ private BinaryRow createPartitionRow(LinkedHashMap<String, String>
partitionSpec) {
+ RowType partitionRowType = table.partitionType();
+ List<DataField> fields = partitionRowType.getFields();
+
+ // Create value setters for each field type
+ List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
+ for (DataField field : fields) {
+ valueSetters.add(BinaryWriter.createValueSetter(field.type()));
+ }
+
+ // Create binary row to hold partition values
+ BinaryRow binaryRow = new BinaryRow(fields.size());
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+ // Fill in partition values
+ for (int i = 0; i < fields.size(); i++) {
+ String fieldName = fields.get(i).name();
+ String partitionValue = partitionSpec.get(fieldName);
+
+ if (partitionValue == null ||
partitionValue.equals(table.defaultPartName())) {
+ // Set null for default partition name or missing partition
+ binaryRowWriter.setNullAt(i);
+ } else {
+ // Convert string value to appropriate type and set it
+ Object value = TypeUtils.castFromString(partitionValue,
fields.get(i).type());
+ valueSetters.get(i).setValue(binaryRowWriter, i, value);
+ }
+ }
+
+ binaryRowWriter.complete();
+ return binaryRow;
+ }
+
+ private class FormatTableScanPlan implements Plan {
+ @Override
+ public List<Split> splits() {
+ List<Split> splits = new ArrayList<>();
+ try {
+ FileIO fileIO = table.fileIO();
+ if (!table.partitionKeys().isEmpty()) {
+ List<Pair<LinkedHashMap<String, String>, Path>>
partition2Paths =
+ PartitionPathUtils.searchPartSpecAndPaths(
+ fileIO,
+ new Path(table.location()),
+ table.partitionKeys().size());
+ for (Pair<LinkedHashMap<String, String>, Path>
partition2Path :
+ partition2Paths) {
+ LinkedHashMap<String, String> partitionSpec =
partition2Path.getKey();
+ BinaryRow partitionRow =
createPartitionRow(partitionSpec);
+ if (partitionFilter != null &&
partitionFilter.test(partitionRow)) {
+ splits.addAll(
+ getSplits(fileIO,
partition2Path.getValue(), partitionRow));
+ }
+ }
+ } else {
+ splits.addAll(getSplits(fileIO, new
Path(table.location()), null));
+ }
+ if (limit != null) {
+ if (limit <= 0) {
+ return new ArrayList<>();
+ }
+ if (splits.size() > limit) {
+ return splits.subList(0, limit);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to scan files", e);
+ }
+ return splits;
+ }
+ }
+
+ private List<Split> getSplits(FileIO fileIO, Path path, BinaryRow
partition)
+ throws IOException {
+ List<Split> splits = new ArrayList<>();
+ FileStatus[] files = fileIO.listFiles(path, true);
+ for (FileStatus file : files) {
+ if (isDataFileName(file.getPath().getName())) {
+ FormatDataSplit split =
+ new FormatDataSplit(file.getPath(), 0, file.getLen(),
partition);
+ splits.add(split);
+ }
+ }
+ return splits;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 32135518ba..cb718fc73a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -173,6 +173,40 @@ public class FileStorePathFactory {
createExternalPathProvider(partition, bucket));
}
+ public DataFilePathFactory createFormatTableDataFilePathFactory(BinaryRow
partition) {
+ return new DataFilePathFactory(
+ partitionPath(partition),
+ formatIdentifier,
+ dataFilePrefix,
+ changelogFilePrefix,
+ fileSuffixIncludeCompression,
+ fileCompression,
+ createExternalPartitionPathProvider(partition));
+ }
+
+ private ExternalPathProvider createExternalPartitionPathProvider(BinaryRow
partition) {
+ if (externalPaths == null || externalPaths.isEmpty()) {
+ return null;
+ }
+
+ return new ExternalPathProvider(externalPaths,
partitionPath(partition));
+ }
+
+ public Path partitionPath(BinaryRow partition) {
+ Path relativeBucketPath = null;
+ String partitionPath = getPartitionString(partition);
+ if (!partitionPath.isEmpty()) {
+ relativeBucketPath = new Path(partitionPath);
+ }
+ if (dataFilePathDirectory != null) {
+ relativeBucketPath =
+ relativeBucketPath != null
+ ? new Path(dataFilePathDirectory,
relativeBucketPath)
+ : new Path(dataFilePathDirectory);
+ }
+ return relativeBucketPath != null ? new Path(root, relativeBucketPath)
: root;
+ }
+
@Nullable
private ExternalPathProvider createExternalPathProvider(BinaryRow
partition, int bucket) {
if (externalPaths == null || externalPaths.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index ff4084f9d4..26aa0d77a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -18,10 +18,13 @@
package org.apache.paimon.utils;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.LinkedHashMap;
@@ -235,4 +238,62 @@ public class PartitionPathUtils {
return fullPartSpec;
}
+
+ /**
+ * Search all partitions in this path.
+ *
+ * @param path search path.
+ * @param partitionNumber partition number, it will affect path structure.
+ * @return all partition specs to its path.
+ */
+ public static List<Pair<LinkedHashMap<String, String>, Path>>
searchPartSpecAndPaths(
+ FileIO fileIO, Path path, int partitionNumber) {
+ FileStatus[] generatedParts = getFileStatusRecurse(path,
partitionNumber, fileIO);
+ List<Pair<LinkedHashMap<String, String>, Path>> ret = new
ArrayList<>();
+ for (FileStatus part : generatedParts) {
+ // ignore hidden file
+ if (isHiddenFile(part)) {
+ continue;
+ }
+ ret.add(Pair.of(extractPartitionSpecFromPath(part.getPath()),
part.getPath()));
+ }
+ return ret;
+ }
+
+ private static FileStatus[] getFileStatusRecurse(Path path, int
expectLevel, FileIO fileIO) {
+ ArrayList<FileStatus> result = new ArrayList<>();
+
+ try {
+ FileStatus fileStatus = fileIO.getFileStatus(path);
+ listStatusRecursively(fileIO, fileStatus, 0, expectLevel, result);
+ } catch (IOException ignore) {
+ return new FileStatus[0];
+ }
+
+ return result.toArray(new FileStatus[0]);
+ }
+
+ private static void listStatusRecursively(
+ FileIO fileIO,
+ FileStatus fileStatus,
+ int level,
+ int expectLevel,
+ List<FileStatus> results)
+ throws IOException {
+ if (expectLevel == level) {
+ results.add(fileStatus);
+ return;
+ }
+
+ if (fileStatus.isDir()) {
+ for (FileStatus stat : fileIO.listStatus(fileStatus.getPath())) {
+ listStatusRecursively(fileIO, stat, level + 1, expectLevel,
results);
+ }
+ }
+ }
+
+ private static boolean isHiddenFile(FileStatus fileStatus) {
+ String name = fileStatus.getPath().getName();
+ return name.startsWith("_") || name.startsWith(".");
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 2e525a515f..192e8f8171 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -23,11 +23,28 @@ import org.apache.paimon.PagedList;
import org.apache.paimon.TableType;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.HadoopCompressionType;
+import org.apache.paimon.format.SupportsDirectWrite;
+import org.apache.paimon.format.csv.CsvFileFormatFactory;
+import org.apache.paimon.format.json.JsonFileFormatFactory;
+import org.apache.paimon.format.parquet.ParquetFileFormatFactory;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
@@ -37,11 +54,14 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
@@ -54,13 +74,19 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
@@ -555,6 +581,176 @@ public abstract class CatalogTestBase {
.isInstanceOf(RuntimeException.class);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testFormatTableRead(boolean partitioned) throws Exception {
+ if (!supportsFormatTable()) {
+ return;
+ }
+ Random random = new Random();
+ String dbName = "test_db";
+ catalog.createDatabase(dbName, true);
+ int partitionValue = 10;
+ HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f1", DataTypes.INT());
+ schemaBuilder.column("f2", DataTypes.INT());
+ schemaBuilder.column("dt", DataTypes.INT());
+ schemaBuilder.option("type", "format-table");
+ schemaBuilder.option("target-file-size", "1 kb");
+ schemaBuilder.option("file.compression", compressionType.value());
+ String[] formats = {"csv", "parquet", "json"};
+ for (String format : formats) {
+ if (partitioned) {
+ schemaBuilder.partitionKeys("dt");
+ }
+ Identifier identifier = Identifier.create(dbName, "table_" +
format);
+ schemaBuilder.option("file.format", format);
+ catalog.createTable(identifier, schemaBuilder.build(), true);
+ FormatTable table = (FormatTable) catalog.getTable(identifier);
+ int[] projection = new int[] {1, 2};
+ PredicateBuilder builder = new
PredicateBuilder(table.rowType().project(projection));
+ Predicate predicate = builder.greaterOrEqual(0, 10);
+ int size = 5;
+ int checkSize = 3;
+ InternalRow[] datas = new InternalRow[size];
+ InternalRow[] checkDatas = new InternalRow[checkSize];
+ for (int j = 0; j < size; j++) {
+ int f1 = random.nextInt();
+ int f2 = j < checkSize ? random.nextInt(10) + 10 :
random.nextInt(10);
+ datas[j] = GenericRow.of(f1, f2, partitionValue);
+ if (j < checkSize) {
+ checkDatas[j] = GenericRow.of(f2, partitionValue);
+ }
+ }
+ InternalRow dataWithDiffPartition =
+ GenericRow.of(random.nextInt(), random.nextInt(), 11);
+ FormatWriterFactory factory =
+ (buildFileFormatFactory(format)
+ .create(
+ new
FileFormatFactory.FormatContext(
+ new Options(), 1024,
1024)))
+ .createWriterFactory(table.rowType());
+ Map<String, String> partitionSpec = null;
+ if (partitioned) {
+ Path partitionPath =
+ new Path(String.format("%s/%s", table.location(),
"dt=" + partitionValue));
+ DataFilePathFactory dataFilePathFactory =
+ new DataFilePathFactory(
+ partitionPath,
+ format,
+ "data",
+ "change",
+ true,
+ compressionType.value(),
+ null);
+ Path diffPartitionPath =
+ new Path(String.format("%s/%s", table.location(),
"dt=" + 11));
+ DataFilePathFactory diffPartitionPathFactory =
+ new DataFilePathFactory(
+ diffPartitionPath,
+ format,
+ "data",
+ "change",
+ true,
+ compressionType.value(),
+ null);
+ write(factory, dataFilePathFactory.newPath(),
compressionType.value(), datas);
+ write(
+ factory,
+ diffPartitionPathFactory.newPath(),
+ compressionType.value(),
+ dataWithDiffPartition);
+ partitionSpec = new HashMap<>();
+ partitionSpec.put("dt", "" + partitionValue);
+ } else {
+ DataFilePathFactory dataFilePathFactory =
+ new DataFilePathFactory(
+ new Path(table.location()),
+ format,
+ "data",
+ "change",
+ true,
+ compressionType.value(),
+ null);
+ write(factory, dataFilePathFactory.newPath(),
compressionType.value(), datas);
+ }
+ List<InternalRow> readData = read(table, predicate, projection,
partitionSpec, null);
+ Integer limit = checkSize - 1;
+ List<InternalRow> readLimitData =
+ read(table, predicate, projection, partitionSpec, limit);
+ assertThat(readLimitData).hasSize(limit);
+ assertThat(readData).containsExactlyInAnyOrder(checkDatas);
+ catalog.dropTable(Identifier.create(dbName, format), true);
+ }
+ }
+
+ protected FileFormatFactory buildFileFormatFactory(String format) {
+ switch (format) {
+ case "csv":
+ return new CsvFileFormatFactory();
+ case "parquet":
+ return new ParquetFileFormatFactory();
+ case "json":
+ return new JsonFileFormatFactory();
+ default:
+ throw new IllegalArgumentException("Unsupported format: " +
format);
+ }
+ }
+
+ protected void write(
+ FormatWriterFactory factory, Path file, String compression,
InternalRow... rows)
+ throws IOException {
+ FormatWriter writer;
+ PositionOutputStream out = null;
+ if (factory instanceof SupportsDirectWrite) {
+ writer = ((SupportsDirectWrite) factory).create(fileIO, file,
compression);
+ } else {
+ out = fileIO.newOutputStream(file, true);
+ writer = factory.create(out, compression);
+ }
+ for (InternalRow row : rows) {
+ writer.addElement(row);
+ }
+ writer.close();
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ @SafeVarargs
+ protected final List<InternalRow> read(
+ Table table,
+ Predicate predicate,
+ @Nullable int[] projection,
+ @Nullable Map<String, String> partitionSpec,
+ @Nullable Integer limit,
+ Pair<ConfigOption<?>, String>... dynamicOptions)
+ throws Exception {
+ Map<String, String> options = new HashMap<>();
+ for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
+ options.put(pair.getKey().key(), pair.getValue());
+ }
+ table = table.copy(options);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ if (projection != null) {
+ readBuilder.withProjection(projection);
+ }
+ readBuilder.withPartitionFilter(partitionSpec);
+ TableScan scan = readBuilder.newScan();
+ readBuilder.withFilter(predicate);
+ if (limit != null) {
+ readBuilder.withLimit(limit);
+ }
+ try (RecordReader<InternalRow> reader =
+
readBuilder.newRead().executeFilter().createReader(scan.plan())) {
+ InternalRowSerializer serializer = new
InternalRowSerializer(readBuilder.readType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+ return rows;
+ }
+ }
+
@Test
public void testGetTable() throws Exception {
catalog.createDatabase("test_db", false);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
new file mode 100644
index 0000000000..d992796875
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatDataSplitTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FormatDataSplit}. */
+public class FormatDataSplitTest {
+
+ @Test
+ public void testSerializeAndDeserialize() throws IOException,
ClassNotFoundException {
+ // Create test data
+ Path filePath = new Path("/test/path/file.parquet");
+ RowType rowType = RowType.builder().field("id", new IntType()).build();
+ long modificationTime = System.currentTimeMillis();
+
+ // Create a predicate for testing
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = builder.equal(0, 5);
+
+ // Create FormatDataSplit
+ FormatDataSplit split =
+ new FormatDataSplit(
+ filePath, 100L, // offset
+ 1024L, // length
+ null);
+
+ // Test Java serialization
+ byte[] serialized = InstantiationUtil.serializeObject(split);
+ FormatDataSplit deserialized =
+ InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ // Verify the deserialized object
+ assertThat(deserialized.filePath()).isEqualTo(filePath);
+ assertThat(deserialized.offset()).isEqualTo(100L);
+ assertThat(deserialized.length()).isEqualTo(1024L);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
new file mode 100644
index 0000000000..326a9138f6
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link FormatTableScan}. */
+class FormatTableScanTest {
+
+ @ParameterizedTest
+ @ValueSource(
+ strings = {
+ "File.txt",
+ "file.txt",
+ "123file.txt",
+ "F",
+ "File-1.txt",
+ "a",
+ "0",
+ "9test",
+ "Test_file.log"
+ })
+ @DisplayName("Test valid filenames that should return true")
+ void testValidDataFileNames(String fileName) {
+ assertTrue(
+ FormatTableScan.isDataFileName(fileName),
+ "Filename '" + fileName + "' should be valid");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {".hidden", "_file.txt"})
+ @DisplayName("Test invalid filenames that should return false")
+ void testInvalidDataFileNames(String fileName) {
+ assertFalse(
+ FormatTableScan.isDataFileName(fileName),
+ "Filename '" + fileName + "' should be invalid");
+ }
+
+ @Test
+ @DisplayName("Test null input should return false")
+ void testNullInput() {
+ assertFalse(FormatTableScan.isDataFileName(null), "Null input should
return false");
+ }
+}