This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b88c28adf [core] support prune read partition for format table (#6347)
3b88c28adf is described below

commit 3b88c28adfa5457cd4c7f4fd78a106c732fad7ba
Author: jerry <[email protected]>
AuthorDate: Sun Sep 28 21:40:24 2025 +0800

    [core] support prune read partition for format table (#6347)
---
 .../paimon/table/format/FormatReadBuilder.java      | 12 +++++++++++-
 .../org/apache/paimon/catalog/CatalogTestBase.java  | 21 ++++++++++++++++-----
 .../paimon/spark/table/PaimonFormatTableTest.scala  | 15 ++++++++++-----
 3 files changed, 37 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index d538c82e19..c86559ff04 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -47,6 +47,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
@@ -161,10 +162,12 @@ public class FormatReadBuilder implements ReadBuilder {
         List<Predicate> readFilters =
                 excludePredicateWithFields(
                         PredicateBuilder.splitAnd(filter), new 
HashSet<>(table.partitionKeys()));
+        RowType dataRowType = getRowTypeWithoutPartition(table.rowType(), 
table.partitionKeys());
+        RowType readRowType = getRowTypeWithoutPartition(readType(), 
table.partitionKeys());
         FormatReaderFactory readerFactory =
                 FileFormatDiscover.of(options)
                         .discover(options.formatType())
-                        .createReaderFactory(table.rowType(), readType(), 
readFilters);
+                        .createReaderFactory(dataRowType, readRowType, 
readFilters);
 
         Pair<int[], RowType> partitionMapping =
                 PartitionUtils.getPartitionMapping(
@@ -183,6 +186,13 @@ public class FormatReadBuilder implements ReadBuilder {
                 Collections.emptyMap());
     }
 
+    private static RowType getRowTypeWithoutPartition(RowType rowType, 
List<String> partitionKeys) {
+        return rowType.project(
+                rowType.getFieldNames().stream()
+                        .filter(name -> !partitionKeys.contains(name))
+                        .collect(Collectors.toList()));
+    }
+
     // ===================== Unsupported ===============================
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 55d03ec6b0..9c9e46e98e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -624,8 +624,8 @@ public abstract class CatalogTestBase {
                     (buildFileFormatFactory(format)
                                     .create(
                                             new 
FileFormatFactory.FormatContext(
-                                                    new Options(), 1024, 
1024)))
-                            .createWriterFactory(table.rowType());
+                                                    new 
Options(table.options()), 1024, 1024)))
+                            
.createWriterFactory(getFormatTableWriteRowType(table));
             Path partitionPath =
                     new Path(
                             String.format(
@@ -700,8 +700,8 @@ public abstract class CatalogTestBase {
                     (buildFileFormatFactory(format)
                                     .create(
                                             new 
FileFormatFactory.FormatContext(
-                                                    new Options(), 1024, 
1024)))
-                            .createWriterFactory(table.rowType());
+                                                    new 
Options(table.options()), 1024, 1024)))
+                            
.createWriterFactory(getFormatTableWriteRowType(table));
             Map<String, String> partitionSpec = null;
             if (partitioned) {
                 Path partitionPath =
@@ -767,6 +767,14 @@ public abstract class CatalogTestBase {
         }
     }
 
+    protected RowType getFormatTableWriteRowType(Table table) {
+        return table.rowType()
+                .project(
+                        table.rowType().getFieldNames().stream()
+                                .filter(name -> 
!table.partitionKeys().contains(name))
+                                .collect(Collectors.toList()));
+    }
+
     protected FileFormatFactory buildFileFormatFactory(String format) {
         switch (format) {
             case "csv":
@@ -828,7 +836,10 @@ public abstract class CatalogTestBase {
                 
readBuilder.newRead().executeFilter().createReader(scan.plan())) {
             InternalRowSerializer serializer = new 
InternalRowSerializer(readBuilder.readType());
             List<InternalRow> rows = new ArrayList<>();
-            reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+            reader.forEachRemaining(
+                    row -> {
+                        rows.add(serializer.copy(row));
+                    });
             return rows;
         }
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index dd045ebcfa..62d48ac94c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -39,16 +39,21 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     val tableName = "paimon_format_test_csv_options"
     withTable(tableName) {
       sql(
-        s"CREATE TABLE $tableName (f0 INT, f1 INT) USING CSV OPTIONS ('" +
+        s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV OPTIONS ('" +
           s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
           s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
-              
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+              
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}') 
PARTITIONED BY (`ds` bigint)")
       val table =
         paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).asInstanceOf[FormatTable]
+      val partition = 20250920
       val csvFile =
-        new Path(table.location(), 
"part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
-      table.fileIO().writeFile(csvFile, "1|2\n3|4", false)
-      checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, 2), Row(3, 4)))
+        new Path(
+          table.location(),
+          
s"ds=$partition/part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
+      table.fileIO().writeFile(csvFile, "1|asfasdfsdf\n2|asfasdfsdf", false)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableName"),
+        Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition)))
     }
   }
 

Reply via email to