This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d82165dabb [core] delete branch external file when drop table (#5871)
d82165dabb is described below

commit d82165dabb34cbac279ce4c1a1cc00ccce8a7025
Author: 张凯旋 <[email protected]>
AuthorDate: Fri Aug 1 11:16:01 2025 +0800

    [core] delete branch external file when drop table (#5871)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java | 41 +++++++----
 .../flink/PrimaryKeyFileStoreTableITCase.java      | 85 ++++++++++++++++++++++
 2 files changed, 112 insertions(+), 14 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index ccc59edfa2..841643383c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -52,10 +52,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS;
@@ -318,23 +320,21 @@ public abstract class AbstractCatalog implements Catalog {
         checkNotBranch(identifier, "dropTable");
         checkNotSystemTable(identifier, "dropTable");
 
-        List<Path> externalPaths = new ArrayList<>();
+        Set<Path> externalPaths = new HashSet<>();
         try {
             Table table = getTable(identifier);
             if (table instanceof FileStoreTable) {
                 FileStoreTable fileStoreTable = (FileStoreTable) table;
-                externalPaths =
-                        fileStoreTable.schemaManager().listAll().stream()
-                                .map(
-                                        schema ->
-                                                schema.toSchema()
-                                                        .options()
-                                                        
.get(DATA_FILE_EXTERNAL_PATHS.key()))
-                                .filter(Objects::nonNull)
-                                .flatMap(externalPath -> 
Arrays.stream(externalPath.split(",")))
-                                .map(Path::new)
-                                .distinct()
-                                .collect(Collectors.toList());
+                List<Path> schemaExternalPaths =
+                        
getSchemaExternalPaths(fileStoreTable.schemaManager().listAll());
+                externalPaths.addAll(schemaExternalPaths);
+                // get table branch external path
+                List<String> branches = 
fileStoreTable.branchManager().branches();
+                for (String branch : branches) {
+                    SchemaManager schemaManager =
+                            
fileStoreTable.schemaManager().copyWithBranch(branch);
+                    
externalPaths.addAll(getSchemaExternalPaths(schemaManager.listAll()));
+                }
             }
         } catch (TableNotExistException e) {
             if (ignoreIfNotExists) {
@@ -343,7 +343,20 @@ public abstract class AbstractCatalog implements Catalog {
             throw new TableNotExistException(identifier);
         }
 
-        dropTableImpl(identifier, externalPaths);
+        dropTableImpl(identifier, new ArrayList<>(externalPaths));
+    }
+
+    private List<Path> getSchemaExternalPaths(List<TableSchema> schemas) {
+        if (schemas == null) {
+            return Collections.emptyList();
+        }
+        return schemas.stream()
+                .map(schema -> 
schema.toSchema().options().get(DATA_FILE_EXTERNAL_PATHS.key()))
+                .filter(Objects::nonNull)
+                .flatMap(externalPath -> 
Arrays.stream(externalPath.split(",")))
+                .map(Path::new)
+                .distinct()
+                .collect(Collectors.toList());
     }
 
     protected abstract void dropTableImpl(Identifier identifier, List<Path> 
externalPaths);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 2c9bbaffd0..2930842bcb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -429,6 +429,91 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         assertThat(fileIO.exists(new Path(externalPath2))).isFalse();
     }
 
+    @Test
+    public void testDropTableWithBranchExternalPaths() throws Exception {
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+                        .parallelism(1)
+                        .build();
+
+        sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+        sEnv.executeSql("USE CATALOG testCatalog");
+        String externalPaths = TraceableFileIO.SCHEME + "://" + externalPath1;
+        String externalPath2s = LocalFileIOLoader.SCHEME + "://" + 
externalPath2;
+        sEnv.executeSql(
+                "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT 
ENFORCED ) "
+                        + "WITH ( "
+                        + "'bucket' = '1',"
+                        + "'data-file.external-paths' = '"
+                        + externalPaths
+                        + "',"
+                        + "'data-file.external-paths.strategy' = 'round-robin'"
+                        + ")");
+
+        // create branch
+        sEnv.executeSql(
+                String.format("CALL sys.create_branch('%s.%s', 'branch1')", 
"default", "T2"));
+
+        // insert data to branch
+        sEnv.executeSql("INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ 
VALUES (1, 'A')")
+                .await();
+
+        CloseableIterator<Row> it =
+                collect(sEnv.executeSql("SELECT * FROM T2 /*+ OPTIONS('branch' 
= 'branch1') */"));
+
+        // read initial data
+        List<String> actual = new ArrayList<>();
+        for (int i = 0; i < 1; i++) {
+            actual.add(it.next().toString());
+        }
+        assertThat(actual).containsExactlyInAnyOrder("+I[1, A]");
+
+        String sql =
+                String.format(
+                        "ALTER TABLE `T2$branch_branch1` SET ( 
'data-file.external-paths' = '%s')",
+                        externalPath2s);
+        sEnv.executeSql(sql);
+        // insert data
+        sEnv.executeSql("INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ 
VALUES (2, 'B')")
+                .await();
+
+        for (int i = 0; i < 1; i++) {
+            actual.add(it.next().toString());
+        }
+
+        // alter table external path
+        sEnv.executeSql(
+                "ALTER TABLE T2 SET ( "
+                        + "'data-file.external-paths' = '"
+                        + externalPath2s
+                        + "'"
+                        + ")");
+
+        // insert data
+        sEnv.executeSql("INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ 
VALUES (3, 'C')")
+                .await();
+
+        for (int i = 0; i < 1; i++) {
+            actual.add(it.next().toString());
+        }
+
+        assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]", 
"+I[3, C]");
+
+        LocalFileIO fileIO = LocalFileIO.create();
+
+        assertThat(fileIO.exists(new Path(externalPath1))).isTrue();
+        assertThat(fileIO.exists(new Path(externalPath2))).isTrue();
+
+        // drop table
+        sEnv.executeSql("DROP TABLE T2");
+
+        assertThat(fileIO.exists(new Path(path + "/warehouse" + "/default.db" 
+ "/T2"))).isFalse();
+        assertThat(fileIO.exists(new Path(externalPath1))).isFalse();
+        assertThat(fileIO.exists(new Path(externalPath2))).isFalse();
+    }
+
     @Test
     public void testTableReadWriteWithExternalPathSpecificFS() throws 
Exception {
         TableEnvironment sEnv =

Reply via email to