This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3b88c28adf [core] support prune read partition for format table (#6347)
3b88c28adf is described below
commit 3b88c28adfa5457cd4c7f4fd78a106c732fad7ba
Author: jerry <[email protected]>
AuthorDate: Sun Sep 28 21:40:24 2025 +0800
[core] support prune read partition for format table (#6347)
---
.../paimon/table/format/FormatReadBuilder.java | 12 +++++++++++-
.../org/apache/paimon/catalog/CatalogTestBase.java | 21 ++++++++++++++++-----
.../paimon/spark/table/PaimonFormatTableTest.scala | 15 ++++++++++-----
3 files changed, 37 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index d538c82e19..c86559ff04 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -47,6 +47,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
@@ -161,10 +162,12 @@ public class FormatReadBuilder implements ReadBuilder {
List<Predicate> readFilters =
excludePredicateWithFields(
PredicateBuilder.splitAnd(filter), new
HashSet<>(table.partitionKeys()));
+ RowType dataRowType = getRowTypeWithoutPartition(table.rowType(),
table.partitionKeys());
+ RowType readRowType = getRowTypeWithoutPartition(readType(),
table.partitionKeys());
FormatReaderFactory readerFactory =
FileFormatDiscover.of(options)
.discover(options.formatType())
- .createReaderFactory(table.rowType(), readType(),
readFilters);
+ .createReaderFactory(dataRowType, readRowType,
readFilters);
Pair<int[], RowType> partitionMapping =
PartitionUtils.getPartitionMapping(
@@ -183,6 +186,13 @@ public class FormatReadBuilder implements ReadBuilder {
Collections.emptyMap());
}
+ private static RowType getRowTypeWithoutPartition(RowType rowType,
List<String> partitionKeys) {
+ return rowType.project(
+ rowType.getFieldNames().stream()
+ .filter(name -> !partitionKeys.contains(name))
+ .collect(Collectors.toList()));
+ }
+
// ===================== Unsupported ===============================
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 55d03ec6b0..9c9e46e98e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -624,8 +624,8 @@ public abstract class CatalogTestBase {
(buildFileFormatFactory(format)
.create(
new
FileFormatFactory.FormatContext(
- new Options(), 1024,
1024)))
- .createWriterFactory(table.rowType());
+ new
Options(table.options()), 1024, 1024)))
+
.createWriterFactory(getFormatTableWriteRowType(table));
Path partitionPath =
new Path(
String.format(
@@ -700,8 +700,8 @@ public abstract class CatalogTestBase {
(buildFileFormatFactory(format)
.create(
new
FileFormatFactory.FormatContext(
- new Options(), 1024,
1024)))
- .createWriterFactory(table.rowType());
+ new
Options(table.options()), 1024, 1024)))
+
.createWriterFactory(getFormatTableWriteRowType(table));
Map<String, String> partitionSpec = null;
if (partitioned) {
Path partitionPath =
@@ -767,6 +767,14 @@ public abstract class CatalogTestBase {
}
}
+ protected RowType getFormatTableWriteRowType(Table table) {
+ return table.rowType()
+ .project(
+ table.rowType().getFieldNames().stream()
+ .filter(name ->
!table.partitionKeys().contains(name))
+ .collect(Collectors.toList()));
+ }
+
protected FileFormatFactory buildFileFormatFactory(String format) {
switch (format) {
case "csv":
@@ -828,7 +836,10 @@ public abstract class CatalogTestBase {
readBuilder.newRead().executeFilter().createReader(scan.plan())) {
InternalRowSerializer serializer = new
InternalRowSerializer(readBuilder.readType());
List<InternalRow> rows = new ArrayList<>();
- reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+ reader.forEachRemaining(
+ row -> {
+ rows.add(serializer.copy(row));
+ });
return rows;
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index dd045ebcfa..62d48ac94c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -39,16 +39,21 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
val tableName = "paimon_format_test_csv_options"
withTable(tableName) {
sql(
- s"CREATE TABLE $tableName (f0 INT, f1 INT) USING CSV OPTIONS ('" +
+ s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV OPTIONS ('" +
s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
-
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')
PARTITIONED BY (`ds` bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ val partition = 20250920
val csvFile =
- new Path(table.location(),
"part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
- table.fileIO().writeFile(csvFile, "1|2\n3|4", false)
- checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, 2), Row(3, 4)))
+ new Path(
+ table.location(),
+
s"ds=$partition/part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
+ table.fileIO().writeFile(csvFile, "1|asfasdfsdf\n2|asfasdfsdf", false)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName"),
+ Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition)))
}
}