This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b555fca25 [core] fix the issue where the historical schemaId could not
be found when reading the branch table. (#4454)
b555fca25 is described below
commit b555fca25fd9ab5ae642756add5628beb08167d6
Author: liming.1018 <[email protected]>
AuthorDate: Tue Nov 5 15:08:08 2024 +0800
[core] fix the issue where the historical schemaId could not be found when
reading the branch table. (#4454)
---
.../org/apache/paimon/utils/BranchManager.java | 21 +++++++-----
.../org/apache/paimon/flink/BranchSqlITCase.java | 38 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index c2793de37..bc353bb10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -94,10 +94,7 @@ public class BranchManager {
try {
TableSchema latestSchema = schemaManager.latest().get();
- fileIO.copyFile(
- schemaManager.toSchemaPath(latestSchema.id()),
-
schemaManager.copyWithBranch(branchName).toSchemaPath(latestSchema.id()),
- true);
+ copySchemasToBranch(branchName, latestSchema.id());
} catch (IOException e) {
throw new RuntimeException(
String.format(
@@ -123,10 +120,7 @@ public class BranchManager {
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id()),
true);
- fileIO.copyFile(
- schemaManager.toSchemaPath(snapshot.schemaId()),
-
schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId()),
- true);
+ copySchemasToBranch(branchName, snapshot.schemaId());
} catch (IOException e) {
throw new RuntimeException(
String.format(
@@ -249,4 +243,15 @@ public class BranchManager {
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
}
+
+ private void copySchemasToBranch(String branchName, long schemaId) throws
IOException {
+ for (int i = 0; i <= schemaId; i++) {
+ if (schemaManager.schemaExists(i)) {
+ fileIO.copyFile(
+ schemaManager.toSchemaPath(i),
+
schemaManager.copyWithBranch(branchName).toSchemaPath(i),
+ true);
+ }
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 6cf82131f..1d33a9e8a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
@@ -498,6 +499,43 @@ public class BranchSqlITCase extends CatalogITCaseBase {
.satisfies(anyCauseMatches(IllegalArgumentException.class,
errMsg));
}
+ @Test
+ public void testReadBranchTableWithMultiSchemaIds() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20,
'banana')");
+
+ sql("ALTER TABLE `T` ADD (v2 INT)");
+
+ sql("INSERT INTO T VALUES" + " (2, 10, 'cat', 2)," + " (2, 20, 'dog',
2)");
+
+ sql("ALTER TABLE `T` ADD (v3 INT)");
+
+ sql("CALL sys.create_tag('default.T', 'tag1', 2)");
+
+ sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
+
+ FileStoreTable table = paimonTable("T");
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location(), "test");
+ List<Long> schemaIds = schemaManager.listAllIds();
+ assertThat(schemaIds.size()).isEqualTo(2);
+
+ assertThat(collectResult("SELECT * FROM T$branch_test"))
+ .containsExactlyInAnyOrder(
+ "+I[1, 10, apple, null]",
+ "+I[1, 20, banana, null]",
+ "+I[2, 10, cat, 2]",
+ "+I[2, 20, dog, 2]");
+ }
+
private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {