This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b555fca25 [core] fix the issue where the historical schemaId could not 
be found when reading the branch table. (#4454)
b555fca25 is described below

commit b555fca25fd9ab5ae642756add5628beb08167d6
Author: liming.1018 <[email protected]>
AuthorDate: Tue Nov 5 15:08:08 2024 +0800

    [core] fix the issue where the historical schemaId could not be found when 
reading the branch table. (#4454)
---
 .../org/apache/paimon/utils/BranchManager.java     | 21 +++++++-----
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 38 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index c2793de37..bc353bb10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -94,10 +94,7 @@ public class BranchManager {
 
         try {
             TableSchema latestSchema = schemaManager.latest().get();
-            fileIO.copyFile(
-                    schemaManager.toSchemaPath(latestSchema.id()),
-                    
schemaManager.copyWithBranch(branchName).toSchemaPath(latestSchema.id()),
-                    true);
+            copySchemasToBranch(branchName, latestSchema.id());
         } catch (IOException e) {
             throw new RuntimeException(
                     String.format(
@@ -123,10 +120,7 @@ public class BranchManager {
                     snapshotManager.snapshotPath(snapshot.id()),
                     
snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id()),
                     true);
-            fileIO.copyFile(
-                    schemaManager.toSchemaPath(snapshot.schemaId()),
-                    
schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId()),
-                    true);
+            copySchemasToBranch(branchName, snapshot.schemaId());
         } catch (IOException e) {
             throw new RuntimeException(
                     String.format(
@@ -249,4 +243,15 @@ public class BranchManager {
                 "Branch name cannot be pure numeric string but is '%s'.",
                 branchName);
     }
+
+    private void copySchemasToBranch(String branchName, long schemaId) throws 
IOException {
+        for (int i = 0; i <= schemaId; i++) {
+            if (schemaManager.schemaExists(i)) {
+                fileIO.copyFile(
+                        schemaManager.toSchemaPath(i),
+                        
schemaManager.copyWithBranch(branchName).toSchemaPath(i),
+                        true);
+            }
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 6cf82131f..1d33a9e8a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.SnapshotManager;
@@ -498,6 +499,43 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                 .satisfies(anyCauseMatches(IllegalArgumentException.class, 
errMsg));
     }
 
+    @Test
+    public void testReadBranchTableWithMultiSchemaIds() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 
'banana')");
+
+        sql("ALTER TABLE `T` ADD (v2 INT)");
+
+        sql("INSERT INTO T VALUES" + " (2, 10, 'cat', 2)," + " (2, 20, 'dog', 
2)");
+
+        sql("ALTER TABLE `T` ADD (v3 INT)");
+
+        sql("CALL sys.create_tag('default.T', 'tag1', 2)");
+
+        sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
+
+        FileStoreTable table = paimonTable("T");
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location(), "test");
+        List<Long> schemaIds = schemaManager.listAllIds();
+        assertThat(schemaIds.size()).isEqualTo(2);
+
+        assertThat(collectResult("SELECT * FROM T$branch_test"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 10, apple, null]",
+                        "+I[1, 20, banana, null]",
+                        "+I[2, 10, cat, 2]",
+                        "+I[2, 20, dog, 2]");
+    }
+
     private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

Reply via email to