This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3199bcbd76 [core] Support push down branchesTable by branchName (#6231)
3199bcbd76 is described below
commit 3199bcbd7684fa2142ac8d51ce806874a5b4d927
Author: big face cat <[email protected]>
AuthorDate: Thu Sep 11 17:17:16 2025 +0800
[core] Support push down branchesTable by branchName (#6231)
---
.../apache/paimon/table/system/BranchesTable.java | 106 ++++++++++++++++-----
.../org/apache/paimon/flink/BranchSqlITCase.java | 35 +++++++
2 files changed, 119 insertions(+), 22 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 2330796a33..490f683c63 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -25,6 +25,12 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.InPredicateVisitor;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LeafPredicateExtractor;
+import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
@@ -48,6 +54,8 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
@@ -67,11 +75,12 @@ public class BranchesTable implements ReadonlyTable {
public static final String BRANCHES = "branches";
+ private static final String BRANCH_NAME = "branch_name";
+
public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
- new DataField(
- 0, "branch_name",
SerializationUtils.newStringType(false)),
+ new DataField(0, BRANCH_NAME,
SerializationUtils.newStringType(false)),
new DataField(1, "create_time", new
TimestampType(false, 3))));
private final FileIO fileIO;
@@ -97,7 +106,7 @@ public class BranchesTable implements ReadonlyTable {
@Override
public List<String> primaryKeys() {
- return Collections.singletonList("branch_name");
+ return Collections.singletonList(BRANCH_NAME);
}
@Override
@@ -121,16 +130,21 @@ public class BranchesTable implements ReadonlyTable {
}
private class BranchesScan extends ReadOnceTableScan {
+ private @Nullable Predicate branchPredicate;
@Override
public InnerTableScan withFilter(Predicate predicate) {
- // TODO
+ if (predicate == null) {
+ return this;
+ }
+ branchPredicate = predicate;
+
return this;
}
@Override
public Plan innerPlan() {
- return () -> Collections.singletonList(new
BranchesSplit(location));
+ return () -> Collections.singletonList(new BranchesSplit(location,
branchPredicate));
}
}
@@ -139,8 +153,11 @@ public class BranchesTable implements ReadonlyTable {
private final Path location;
- private BranchesSplit(Path location) {
+ private final @Nullable Predicate branchPredicate;
+
+ private BranchesSplit(Path location, @Nullable Predicate
branchPredicate) {
this.location = location;
+ this.branchPredicate = branchPredicate;
}
@Override
@@ -152,7 +169,8 @@ public class BranchesTable implements ReadonlyTable {
return false;
}
BranchesSplit that = (BranchesSplit) o;
- return Objects.equals(location, that.location);
+ return Objects.equals(location, that.location)
+ && Objects.equals(branchPredicate, that.branchPredicate);
}
@Override
@@ -194,10 +212,11 @@ public class BranchesTable implements ReadonlyTable {
}
Path location = ((BranchesSplit) split).location;
+ Predicate predicate = ((BranchesSplit) split).branchPredicate;
FileStoreTable table = FileStoreTableFactory.create(fileIO,
location);
Iterator<InternalRow> rows;
try {
- rows = branches(table).iterator();
+ rows = branches(table, predicate).iterator();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@@ -214,23 +233,66 @@ public class BranchesTable implements ReadonlyTable {
return new IteratorRecordReader<>(rows);
}
- private List<InternalRow> branches(FileStoreTable table) throws
IOException {
- BranchManager branchManager = table.branchManager();
+ /**
+ * Creates an InternalRow for a branch with its creation time.
+ *
+ * @param branchName the name of the branch
+ * @param tablePath the table path
+ * @return InternalRow containing branch name and creation time
+ * @throws IOException if unable to get file status
+ */
+ private InternalRow createBranchRow(String branchName, Path tablePath)
throws IOException {
+ String branchPath = BranchManager.branchPath(tablePath,
branchName);
+ long creationTime = fileIO.getFileStatus(new
Path(branchPath)).getModificationTime();
+ return GenericRow.of(
+ BinaryString.fromString(branchName),
+
Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(creationTime)));
+ }
- List<InternalRow> result = new ArrayList<>();
- List<String> branches = branchManager.branches();
+ private List<InternalRow> branches(FileStoreTable table, Predicate
predicate)
+ throws IOException {
+ BranchManager branchManager = table.branchManager();
Path tablePath = table.location();
- for (String branch : branches) {
- String branchPath = BranchManager.branchPath(tablePath,
branch);
- long creationTime =
- fileIO.getFileStatus(new
Path(branchPath)).getModificationTime();
- result.add(
- GenericRow.of(
- BinaryString.fromString(branch),
- Timestamp.fromLocalDateTime(
-
DateTimeUtils.toLocalDateTime(creationTime))));
+ List<InternalRow> result = new ArrayList<>();
+ // Handle predicate filtering for branch_name
+ if (predicate != null) {
+ // Handle Equal predicate
+ if (predicate instanceof LeafPredicate
+ && ((LeafPredicate) predicate).function() instanceof
Equal
+ && ((LeafPredicate) predicate).literals().get(0)
instanceof BinaryString
+ &&
predicate.visit(LeafPredicateExtractor.INSTANCE).get(BRANCH_NAME)
+ != null) {
+ String equalValue = ((LeafPredicate)
predicate).literals().get(0).toString();
+ if (branchManager.branchExists(equalValue)) {
+ result.add(createBranchRow(equalValue, tablePath));
+ }
+ }
+
+ // Handle CompoundPredicate (OR case for IN filter)
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ if ((compoundPredicate.function()) instanceof Or) {
+ List<String> branchNames = new ArrayList<>();
+ InPredicateVisitor.extractInElements(predicate,
BRANCH_NAME)
+ .ifPresent(
+ e ->
+ e.stream()
+ .map(Object::toString)
+
.forEach(branchNames::add));
+ for (String branchName : branchNames) {
+ if (branchManager.branchExists(branchName)) {
+ result.add(createBranchRow(branchName,
tablePath));
+ }
+ }
+ }
+ }
+ } else {
+ // Fallback to original logic if no predicate
+ List<String> branches = branchManager.branches();
+ for (String branch : branches) {
+ result.add(createBranchRow(branch, tablePath));
+ }
}
-
return result;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index c593db273b..9850845fef 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -834,6 +834,41 @@ public class BranchSqlITCase extends CatalogITCaseBase {
"+I[2, 220, 2200]");
}
+ @Test
+ public void testBranchesTableFilter() throws Exception {
+ sql("CREATE TABLE T (a INT, b INT)");
+ sql("INSERT INTO T VALUES (1, 1)");
+ sql("CALL sys.create_branch('default.T', 'b1')");
+ sql("CALL sys.create_branch('default.T', 'b2')");
+ sql("CALL sys.create_branch('default.T', 'b3')");
+
+ // no filter
+ assertThat(collectResult("SELECT branch_name FROM `T$branches`"))
+ .containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]");
+
+ // equals
+ assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE
branch_name = 'b2'"))
+ .containsExactlyInAnyOrder("+I[b2]");
+
+ // in
+ assertThat(
+ collectResult(
+ "SELECT branch_name FROM `T$branches` WHERE
branch_name IN ('b1', 'b3')"))
+ .containsExactlyInAnyOrder("+I[b1]", "+I[b3]");
+
+ // in with non-existent branch
+ assertThat(
+ collectResult(
+ "SELECT branch_name FROM `T$branches` WHERE
branch_name IN ('b1', 'non_existent')"))
+ .containsExactlyInAnyOrder("+I[b1]");
+
+ // equals with non-existent branch
+ assertThat(
+ collectResult(
+ "SELECT branch_name FROM `T$branches` WHERE
branch_name = 'non_existent'"))
+ .isEmpty();
+ }
+
private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {