This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3199bcbd76 [core] Support push down branchesTable by branchName (#6231)
3199bcbd76 is described below

commit 3199bcbd7684fa2142ac8d51ce806874a5b4d927
Author: big face cat <[email protected]>
AuthorDate: Thu Sep 11 17:17:16 2025 +0800

    [core] Support push down branchesTable by branchName (#6231)
---
 .../apache/paimon/table/system/BranchesTable.java  | 106 ++++++++++++++++-----
 .../org/apache/paimon/flink/BranchSqlITCase.java   |  35 +++++++
 2 files changed, 119 insertions(+), 22 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 2330796a33..490f683c63 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -25,6 +25,12 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.InPredicateVisitor;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
@@ -48,6 +54,8 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
@@ -67,11 +75,12 @@ public class BranchesTable implements ReadonlyTable {
 
     public static final String BRANCHES = "branches";
 
+    private static final String BRANCH_NAME = "branch_name";
+
     public static final RowType TABLE_TYPE =
             new RowType(
                     Arrays.asList(
-                            new DataField(
-                                    0, "branch_name", 
SerializationUtils.newStringType(false)),
+                            new DataField(0, BRANCH_NAME, 
SerializationUtils.newStringType(false)),
                             new DataField(1, "create_time", new 
TimestampType(false, 3))));
 
     private final FileIO fileIO;
@@ -97,7 +106,7 @@ public class BranchesTable implements ReadonlyTable {
 
     @Override
     public List<String> primaryKeys() {
-        return Collections.singletonList("branch_name");
+        return Collections.singletonList(BRANCH_NAME);
     }
 
     @Override
@@ -121,16 +130,21 @@ public class BranchesTable implements ReadonlyTable {
     }
 
     private class BranchesScan extends ReadOnceTableScan {
+        private @Nullable Predicate branchPredicate;
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
-            // TODO
+            if (predicate == null) {
+                return this;
+            }
+            branchPredicate = predicate;
+
             return this;
         }
 
         @Override
         public Plan innerPlan() {
-            return () -> Collections.singletonList(new 
BranchesSplit(location));
+            return () -> Collections.singletonList(new BranchesSplit(location, 
branchPredicate));
         }
     }
 
@@ -139,8 +153,11 @@ public class BranchesTable implements ReadonlyTable {
 
         private final Path location;
 
-        private BranchesSplit(Path location) {
+        private final @Nullable Predicate branchPredicate;
+
+        private BranchesSplit(Path location, @Nullable Predicate 
branchPredicate) {
             this.location = location;
+            this.branchPredicate = branchPredicate;
         }
 
         @Override
@@ -152,7 +169,8 @@ public class BranchesTable implements ReadonlyTable {
                 return false;
             }
             BranchesSplit that = (BranchesSplit) o;
-            return Objects.equals(location, that.location);
+            return Objects.equals(location, that.location)
+                    && Objects.equals(branchPredicate, that.branchPredicate);
         }
 
         @Override
@@ -194,10 +212,11 @@ public class BranchesTable implements ReadonlyTable {
             }
 
             Path location = ((BranchesSplit) split).location;
+            Predicate predicate = ((BranchesSplit) split).branchPredicate;
             FileStoreTable table = FileStoreTableFactory.create(fileIO, 
location);
             Iterator<InternalRow> rows;
             try {
-                rows = branches(table).iterator();
+                rows = branches(table, predicate).iterator();
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
@@ -214,23 +233,66 @@ public class BranchesTable implements ReadonlyTable {
             return new IteratorRecordReader<>(rows);
         }
 
-        private List<InternalRow> branches(FileStoreTable table) throws 
IOException {
-            BranchManager branchManager = table.branchManager();
+        /**
+         * Creates an InternalRow for a branch with its creation time.
+         *
+         * @param branchName the name of the branch
+         * @param tablePath the table path
+         * @return InternalRow containing branch name and creation time
+         * @throws IOException if unable to get file status
+         */
+        private InternalRow createBranchRow(String branchName, Path tablePath) 
throws IOException {
+            String branchPath = BranchManager.branchPath(tablePath, 
branchName);
+            long creationTime = fileIO.getFileStatus(new 
Path(branchPath)).getModificationTime();
+            return GenericRow.of(
+                    BinaryString.fromString(branchName),
+                    
Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(creationTime)));
+        }
 
-            List<InternalRow> result = new ArrayList<>();
-            List<String> branches = branchManager.branches();
+        private List<InternalRow> branches(FileStoreTable table, Predicate 
predicate)
+                throws IOException {
+            BranchManager branchManager = table.branchManager();
             Path tablePath = table.location();
-            for (String branch : branches) {
-                String branchPath = BranchManager.branchPath(tablePath, 
branch);
-                long creationTime =
-                        fileIO.getFileStatus(new 
Path(branchPath)).getModificationTime();
-                result.add(
-                        GenericRow.of(
-                                BinaryString.fromString(branch),
-                                Timestamp.fromLocalDateTime(
-                                        
DateTimeUtils.toLocalDateTime(creationTime))));
+            List<InternalRow> result = new ArrayList<>();
+            // Handle predicate filtering for branch_name
+            if (predicate != null) {
+                // Handle Equal predicate
+                if (predicate instanceof LeafPredicate
+                        && ((LeafPredicate) predicate).function() instanceof 
Equal
+                        && ((LeafPredicate) predicate).literals().get(0) 
instanceof BinaryString
+                        && 
predicate.visit(LeafPredicateExtractor.INSTANCE).get(BRANCH_NAME)
+                                != null) {
+                    String equalValue = ((LeafPredicate) 
predicate).literals().get(0).toString();
+                    if (branchManager.branchExists(equalValue)) {
+                        result.add(createBranchRow(equalValue, tablePath));
+                    }
+                }
+
+                // Handle CompoundPredicate (OR case for IN filter)
+                if (predicate instanceof CompoundPredicate) {
+                    CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+                    if ((compoundPredicate.function()) instanceof Or) {
+                        List<String> branchNames = new ArrayList<>();
+                        InPredicateVisitor.extractInElements(predicate, 
BRANCH_NAME)
+                                .ifPresent(
+                                        e ->
+                                                e.stream()
+                                                        .map(Object::toString)
+                                                        
.forEach(branchNames::add));
+                        for (String branchName : branchNames) {
+                            if (branchManager.branchExists(branchName)) {
+                                result.add(createBranchRow(branchName, 
tablePath));
+                            }
+                        }
+                    }
+                }
+            } else {
+                // Fallback to original logic if no predicate
+                List<String> branches = branchManager.branches();
+                for (String branch : branches) {
+                    result.add(createBranchRow(branch, tablePath));
+                }
             }
-
             return result;
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index c593db273b..9850845fef 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -834,6 +834,41 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                         "+I[2, 220, 2200]");
     }
 
+    @Test
+    public void testBranchesTableFilter() throws Exception {
+        sql("CREATE TABLE T (a INT, b INT)");
+        sql("INSERT INTO T VALUES (1, 1)");
+        sql("CALL sys.create_branch('default.T', 'b1')");
+        sql("CALL sys.create_branch('default.T', 'b2')");
+        sql("CALL sys.create_branch('default.T', 'b3')");
+
+        // no filter
+        assertThat(collectResult("SELECT branch_name FROM `T$branches`"))
+                .containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]");
+
+        // equals
+        assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE 
branch_name = 'b2'"))
+                .containsExactlyInAnyOrder("+I[b2]");
+
+        // in
+        assertThat(
+                        collectResult(
+                                "SELECT branch_name FROM `T$branches` WHERE 
branch_name IN ('b1', 'b3')"))
+                .containsExactlyInAnyOrder("+I[b1]", "+I[b3]");
+
+        // in with non-existent branch
+        assertThat(
+                        collectResult(
+                                "SELECT branch_name FROM `T$branches` WHERE 
branch_name IN ('b1', 'non_existent')"))
+                .containsExactlyInAnyOrder("+I[b1]");
+
+        // equals with non-existent branch
+        assertThat(
+                        collectResult(
+                                "SELECT branch_name FROM `T$branches` WHERE 
branch_name = 'non_existent'"))
+                .isEmpty();
+    }
+
     private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

Reply via email to