This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d82165dabb [core] delete branch external file when drop table (#5871)
d82165dabb is described below
commit d82165dabb34cbac279ce4c1a1cc00ccce8a7025
Author: 张凯旋 <[email protected]>
AuthorDate: Fri Aug 1 11:16:01 2025 +0800
[core] delete branch external file when drop table (#5871)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 41 +++++++----
.../flink/PrimaryKeyFileStoreTableITCase.java | 85 ++++++++++++++++++++++
2 files changed, 112 insertions(+), 14 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index ccc59edfa2..841643383c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -52,10 +52,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS;
@@ -318,23 +320,21 @@ public abstract class AbstractCatalog implements Catalog {
checkNotBranch(identifier, "dropTable");
checkNotSystemTable(identifier, "dropTable");
- List<Path> externalPaths = new ArrayList<>();
+ Set<Path> externalPaths = new HashSet<>();
try {
Table table = getTable(identifier);
if (table instanceof FileStoreTable) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
- externalPaths =
- fileStoreTable.schemaManager().listAll().stream()
- .map(
- schema ->
- schema.toSchema()
- .options()
-
.get(DATA_FILE_EXTERNAL_PATHS.key()))
- .filter(Objects::nonNull)
- .flatMap(externalPath ->
Arrays.stream(externalPath.split(",")))
- .map(Path::new)
- .distinct()
- .collect(Collectors.toList());
+ List<Path> schemaExternalPaths =
+
getSchemaExternalPaths(fileStoreTable.schemaManager().listAll());
+ externalPaths.addAll(schemaExternalPaths);
+ // get table branch external path
+ List<String> branches =
fileStoreTable.branchManager().branches();
+ for (String branch : branches) {
+ SchemaManager schemaManager =
+
fileStoreTable.schemaManager().copyWithBranch(branch);
+
externalPaths.addAll(getSchemaExternalPaths(schemaManager.listAll()));
+ }
}
} catch (TableNotExistException e) {
if (ignoreIfNotExists) {
@@ -343,7 +343,20 @@ public abstract class AbstractCatalog implements Catalog {
throw new TableNotExistException(identifier);
}
- dropTableImpl(identifier, externalPaths);
+ dropTableImpl(identifier, new ArrayList<>(externalPaths));
+ }
+
+ private List<Path> getSchemaExternalPaths(List<TableSchema> schemas) {
+ if (schemas == null) {
+ return Collections.emptyList();
+ }
+ return schemas.stream()
+ .map(schema ->
schema.toSchema().options().get(DATA_FILE_EXTERNAL_PATHS.key()))
+ .filter(Objects::nonNull)
+ .flatMap(externalPath ->
Arrays.stream(externalPath.split(",")))
+ .map(Path::new)
+ .distinct()
+ .collect(Collectors.toList());
}
protected abstract void dropTableImpl(Identifier identifier, List<Path>
externalPaths);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 2c9bbaffd0..2930842bcb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -429,6 +429,91 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
assertThat(fileIO.exists(new Path(externalPath2))).isFalse();
}
+ @Test
+ public void testDropTableWithBranchExternalPaths() throws Exception {
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+ .parallelism(1)
+ .build();
+
+ sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ sEnv.executeSql("USE CATALOG testCatalog");
+ String externalPaths = TraceableFileIO.SCHEME + "://" + externalPath1;
+ String externalPath2s = LocalFileIOLoader.SCHEME + "://" +
externalPath2;
+ sEnv.executeSql(
+ "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT
ENFORCED ) "
+ + "WITH ( "
+ + "'bucket' = '1',"
+ + "'data-file.external-paths' = '"
+ + externalPaths
+ + "',"
+ + "'data-file.external-paths.strategy' = 'round-robin'"
+ + ")");
+
+ // create branch
+ sEnv.executeSql(
+ String.format("CALL sys.create_branch('%s.%s', 'branch1')",
"default", "T2"));
+
+ // insert data to branch
+ sEnv.executeSql("INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */
VALUES (1, 'A')")
+ .await();
+
+ CloseableIterator<Row> it =
+ collect(sEnv.executeSql("SELECT * FROM T2 /*+ OPTIONS('branch'
= 'branch1') */"));
+
+ // read initial data
+ List<String> actual = new ArrayList<>();
+ for (int i = 0; i < 1; i++) {
+ actual.add(it.next().toString());
+ }
+ assertThat(actual).containsExactlyInAnyOrder("+I[1, A]");
+
+ String sql =
+ String.format(
+ "ALTER TABLE `T2$branch_branch1` SET (
'data-file.external-paths' = '%s')",
+ externalPath2s);
+ sEnv.executeSql(sql);
+ // insert data
+ sEnv.executeSql("INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */
VALUES (2, 'B')")
+ .await();
+
+ for (int i = 0; i < 1; i++) {
+ actual.add(it.next().toString());
+ }
+
+ // alter table external path
+ sEnv.executeSql(
+ "ALTER TABLE T2 SET ( "
+ + "'data-file.external-paths' = '"
+ + externalPath2s
+ + "'"
+ + ")");
+
+ // insert data
+ sEnv.executeSql("INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */
VALUES (3, 'C')")
+ .await();
+
+ for (int i = 0; i < 1; i++) {
+ actual.add(it.next().toString());
+ }
+
+ assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]",
"+I[3, C]");
+
+ LocalFileIO fileIO = LocalFileIO.create();
+
+ assertThat(fileIO.exists(new Path(externalPath1))).isTrue();
+ assertThat(fileIO.exists(new Path(externalPath2))).isTrue();
+
+ // drop table
+ sEnv.executeSql("DROP TABLE T2");
+
+ assertThat(fileIO.exists(new Path(path + "/warehouse" + "/default.db"
+ "/T2"))).isFalse();
+ assertThat(fileIO.exists(new Path(externalPath1))).isFalse();
+ assertThat(fileIO.exists(new Path(externalPath2))).isFalse();
+ }
+
@Test
public void testTableReadWriteWithExternalPathSpecificFS() throws
Exception {
TableEnvironment sEnv =