This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f9c7891f60 [core] delete all schema external files when drop table in
AbstractCatalog (#5688)
f9c7891f60 is described below
commit f9c7891f6002cab4674c1a7f78074f5524b499e8
Author: 张凯旋 <[email protected]>
AuthorDate: Fri Jun 6 14:37:56 2025 +0800
[core] delete all schema external files when drop table in AbstractCatalog
(#5688)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 15 ++++-
.../flink/PrimaryKeyFileStoreTableITCase.java | 71 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 9b6e0a5c55..c69d2b4691 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -51,6 +51,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -60,6 +61,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS;
import static org.apache.paimon.CoreOptions.OBJECT_LOCATION;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TYPE;
@@ -326,7 +328,18 @@ public abstract class AbstractCatalog implements Catalog {
Table table = getTable(identifier);
if (table instanceof FileStoreTable) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
- externalPaths =
fileStoreTable.store().pathFactory().getExternalPaths();
+ externalPaths =
+ fileStoreTable.schemaManager().listAll().stream()
+ .map(
+ schema ->
+ schema.toSchema()
+ .options()
+
.get(DATA_FILE_EXTERNAL_PATHS.key()))
+ .filter(Objects::nonNull)
+ .flatMap(externalPath ->
Arrays.stream(externalPath.split(",")))
+ .map(Path::new)
+ .distinct()
+ .collect(Collectors.toList());
}
} catch (TableNotExistException e) {
if (ignoreIfNotExists) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 032a94c9a9..c64bb4ef41 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -351,6 +351,77 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
assertThat(fileIO.exists(new Path(externalPath2))).isFalse();
}
+ @Test
+ public void testDropTableWithAlterExternalPaths() throws Exception {
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+ .parallelism(1)
+ .build();
+
+ sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ sEnv.executeSql("USE CATALOG testCatalog");
+ String externalPaths = TraceableFileIO.SCHEME + "://" + externalPath1;
+ String externalPath2s = LocalFileIOLoader.SCHEME + "://" +
externalPath2;
+ sEnv.executeSql(
+ "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT
ENFORCED ) "
+ + "WITH ( "
+ + "'bucket' = '1',"
+ + "'data-file.external-paths' = '"
+ + externalPaths
+ + "',"
+ + "'data-file.external-paths.strategy' = 'round-robin'"
+ + ")");
+
+ CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM
T2"));
+
+ // insert data
+ sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
+ // read initial data
+ List<String> actual = new ArrayList<>();
+ for (int i = 0; i < 1; i++) {
+ actual.add(it.next().toString());
+ }
+ assertThat(actual).containsExactlyInAnyOrder("+I[1, A]");
+
+ // insert data
+ sEnv.executeSql("INSERT INTO T2 VALUES (2, 'B')").await();
+
+ for (int i = 0; i < 1; i++) {
+ actual.add(it.next().toString());
+ }
+
+ // alter table external path
+ sEnv.executeSql(
+ "ALTER TABLE T2 SET ( "
+ + "'data-file.external-paths' = '"
+ + externalPath2s
+ + "'"
+ + ")");
+
+ // insert data
+ sEnv.executeSql("INSERT INTO T2 VALUES (3, 'C')").await();
+
+ for (int i = 0; i < 1; i++) {
+ actual.add(it.next().toString());
+ }
+
+ assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]",
"+I[3, C]");
+
+ LocalFileIO fileIO = LocalFileIO.create();
+
+ assertThat(fileIO.exists(new Path(externalPath1))).isTrue();
+ assertThat(fileIO.exists(new Path(externalPath2))).isTrue();
+
+ // drop table
+ sEnv.executeSql("DROP TABLE T2");
+
+ assertThat(fileIO.exists(new Path(path + "/warehouse" + "/default.db"
+ "/T2"))).isFalse();
+ assertThat(fileIO.exists(new Path(externalPath1))).isFalse();
+ assertThat(fileIO.exists(new Path(externalPath2))).isFalse();
+ }
+
@Test
public void testTableReadWriteWithExternalPathSpecificFS() throws
Exception {
TableEnvironment sEnv =