This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-1.1
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.1 by this push:
new fe65ee85cd [#9490] improvement(core): Include lakehouse-generic
catalogs in managed entities for proper drop behavior (#9582)
fe65ee85cd is described below
commit fe65ee85cdf7404ee10116deb7b6ccbf0bca6a3e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 30 13:54:43 2025 +0800
[#9490] improvement(core): Include lakehouse-generic catalogs in managed
entities for proper drop behavior (#9582)
### What changes were proposed in this pull request?
This PR improves the drop behavior for lakehouse-generic catalogs to
properly handle managed entities (Lance tables).
**Changes:**
- Modified `CatalogManager.includeManagedEntities()` to include
RELATIONAL catalogs with provider 'lakehouse-generic'
- Added comprehensive integration test to verify drop behavior for both
managed and external tables
### Why are the changes needed?
Fix #9490
Currently, when dropping a lakehouse-generic catalog, only metadata is
deleted but the physical data (Lance tables managed by Gravitino)
remains. This PR ensures that:
- When dropping a managed catalog, both metadata and physical data are
properly cleaned up
- Managed tables' physical directories are deleted when catalog is
dropped
- External tables' physical directories are preserved (not deleted)
### Does this PR introduce _any_ user-facing change?
Yes. After this change, dropping a lakehouse-generic catalog with
`force=true` will now also delete the physical data for managed Lance
tables, not just the metadata.
### How was this patch tested?
- Added integration test
`testDropCatalogWithManagedAndExternalEntities()` that verifies:
- Managed tables are properly deleted (both metadata and physical data)
- External tables' physical data is preserved
- Test passed successfully in multiple runs
Co-authored-by: Jerry Shao <[email protected]>
---
.../catalog/fileset/FilesetCatalogCapability.java | 5 +-
.../generic/GenericCatalogCapability.java | 6 +-
.../test/CatalogGenericCatalogLanceIT.java | 117 ++++++++++++++++++++-
.../catalog/model/ModelCatalogCapability.java | 5 +-
.../apache/gravitino/catalog/CatalogManager.java | 68 +++++++-----
.../gravitino/connector/capability/Capability.java | 5 +-
.../gravitino/catalog/TestCatalogManager.java | 6 ++
7 files changed, 175 insertions(+), 37 deletions(-)
diff --git
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
index ef9e18a514..6c76b0f1a7 100644
---
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
+++
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
@@ -18,14 +18,15 @@
*/
package org.apache.gravitino.catalog.fileset;
-import java.util.Objects;
+import com.google.common.base.Preconditions;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.connector.capability.CapabilityResult;
public class FilesetCatalogCapability implements Capability {
@Override
public CapabilityResult managedStorage(Scope scope) {
- if (Objects.requireNonNull(scope) == Scope.SCHEMA) {
+ Preconditions.checkArgument(scope != null, "Scope cannot be null.");
+ if (scope == Scope.SCHEMA || scope == Scope.FILESET) {
return CapabilityResult.SUPPORTED;
}
return CapabilityResult.unsupported(
diff --git
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
index 5900d0b51e..2a6bce6b79 100644
---
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
+++
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
@@ -18,7 +18,7 @@
*/
package org.apache.gravitino.catalog.lakehouse.generic;
-import java.util.Objects;
+import com.google.common.base.Preconditions;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.connector.capability.CapabilityResult;
@@ -26,8 +26,8 @@ public class GenericCatalogCapability implements Capability {
@Override
public CapabilityResult managedStorage(Scope scope) {
- if (Objects.requireNonNull(scope) == Scope.TABLE
- || Objects.requireNonNull(scope) == Scope.SCHEMA) {
+ Preconditions.checkArgument(scope != null, "Scope cannot be null.");
+ if (scope == Scope.TABLE || scope == Scope.SCHEMA) {
return CapabilityResult.SUPPORTED;
}
diff --git
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
index fae235818b..d3a34193a7 100644
---
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
+++
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
@@ -53,10 +53,12 @@ import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.io.FileUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.integration.test.util.BaseIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.Column;
@@ -114,8 +116,6 @@ public class CatalogGenericCatalogLanceIT extends BaseIT {
// Create a temp directory for test use
Path tempDir = Files.createTempDirectory("myTempDir");
tempDirectory = tempDir.toString();
- File file = new File(tempDirectory);
- file.deleteOnExit();
}
@AfterAll
@@ -141,6 +141,8 @@ public class CatalogGenericCatalogLanceIT extends BaseIT {
}
client = null;
+
+ FileUtils.deleteDirectory(new File(tempDirectory));
}
@AfterEach
@@ -934,4 +936,115 @@ public class CatalogGenericCatalogLanceIT extends BaseIT {
boolean dropSuccess =
catalog.asTableCatalog().dropTable(newTableIdentifier);
Assertions.assertTrue(dropSuccess);
}
+
+ @Test
+ void testDropCatalogWithManagedAndExternalEntities() {
+ // Create a new catalog for this test to avoid interfering with other tests
+ String testCatalogName =
GravitinoITUtils.genRandomName("drop_catalog_test");
+ Map<String, String> catalogProperties = Maps.newHashMap();
+ metalake.createCatalog(
+ testCatalogName,
+ Catalog.Type.RELATIONAL,
+ provider,
+ "Test catalog for drop",
+ catalogProperties);
+
+ Catalog testCatalog = metalake.loadCatalog(testCatalogName);
+
+ // Create a schema
+ String testSchemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+ Map<String, String> schemaProperties = createSchemaProperties();
+ testCatalog.asSchemas().createSchema(testSchemaName, "Test schema",
schemaProperties);
+
+ Column[] columns = createColumns();
+
+ // Create a managed (non-external) Lance table
+ String managedTableName = GravitinoITUtils.genRandomName(TABLE_PREFIX +
"_managed");
+ NameIdentifier managedTableIdentifier = NameIdentifier.of(testSchemaName,
managedTableName);
+ String managedTableLocation =
+ String.format("%s/%s/%s", tempDirectory, testSchemaName,
managedTableName);
+
+ Map<String, String> managedTableProperties = createProperties();
+ managedTableProperties.put(Table.PROPERTY_TABLE_FORMAT, "lance");
+ managedTableProperties.put(Table.PROPERTY_LOCATION, managedTableLocation);
+
+ Table managedTable =
+ testCatalog
+ .asTableCatalog()
+ .createTable(
+ managedTableIdentifier,
+ columns,
+ "Managed table",
+ managedTableProperties,
+ Transforms.EMPTY_TRANSFORM,
+ null,
+ null);
+
+ Assertions.assertNotNull(managedTable);
+ File managedTableDir = new File(managedTableLocation);
+ Assertions.assertTrue(
+ managedTableDir.exists(), "Managed table directory should exist after
creation");
+
+ // Create an external Lance table
+ String externalTableName = GravitinoITUtils.genRandomName(TABLE_PREFIX +
"_external");
+ NameIdentifier externalTableIdentifier = NameIdentifier.of(testSchemaName,
externalTableName);
+ String externalTableLocation =
+ String.format("%s/%s/%s", tempDirectory, testSchemaName,
externalTableName);
+
+ Map<String, String> externalTableProperties = createProperties();
+ externalTableProperties.put(Table.PROPERTY_TABLE_FORMAT, "lance");
+ externalTableProperties.put(Table.PROPERTY_LOCATION,
externalTableLocation);
+ externalTableProperties.put(Table.PROPERTY_EXTERNAL, "true");
+
+ Table externalTable =
+ testCatalog
+ .asTableCatalog()
+ .createTable(
+ externalTableIdentifier,
+ columns,
+ "External table",
+ externalTableProperties,
+ Transforms.EMPTY_TRANSFORM,
+ null,
+ null);
+
+ Assertions.assertNotNull(externalTable);
+ File externalTableDir = new File(externalTableLocation);
+ Assertions.assertTrue(
+ externalTableDir.exists(), "External table directory should exist
after creation");
+
+ // Verify both tables exist in catalog
+ Table loadedManagedTable =
testCatalog.asTableCatalog().loadTable(managedTableIdentifier);
+ Assertions.assertNotNull(loadedManagedTable);
+
+ Table loadedExternalTable =
testCatalog.asTableCatalog().loadTable(externalTableIdentifier);
+ Assertions.assertNotNull(loadedExternalTable);
+
+ // Drop the catalog with force=true
+ boolean catalogDropped = metalake.dropCatalog(testCatalogName, true);
+ Assertions.assertTrue(catalogDropped, "Catalog should be dropped
successfully");
+
+ // Verify the catalog is dropped
+ Assertions.assertThrows(
+ NoSuchCatalogException.class,
+ () -> metalake.loadCatalog(testCatalogName),
+ "Catalog should not exist after drop");
+
+ // Verify the managed table's physical directory is removed
+ Assertions.assertFalse(
+ managedTableDir.exists(),
+ "Managed table directory should be removed after dropping catalog");
+
+ // Verify the external table's physical directory is preserved
+ Assertions.assertTrue(
+ externalTableDir.exists(),
+ "External table directory should be preserved after dropping catalog");
+
+ // Clean up external table directory manually
+ try {
+ FileUtils.deleteDirectory(externalTableDir);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete external table directory: {}",
externalTableLocation, e);
+ }
+ }
}
diff --git
a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
index b19c9ba2e6..4a930e33bd 100644
---
a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
+++
b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
@@ -18,14 +18,15 @@
*/
package org.apache.gravitino.catalog.model;
-import java.util.Objects;
+import com.google.common.base.Preconditions;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.connector.capability.CapabilityResult;
public class ModelCatalogCapability implements Capability {
@Override
public CapabilityResult managedStorage(Scope scope) {
- if (Objects.requireNonNull(scope) == Scope.SCHEMA) {
+ Preconditions.checkArgument(scope != null, "Scope cannot be null.");
+ if (scope == Scope.SCHEMA || scope == Scope.MODEL) {
return CapabilityResult.SUPPORTED;
}
return CapabilityResult.unsupported(
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 17c0e97701..6835d5ac28 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -103,6 +103,7 @@ import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.utils.IsolatedClassLoader;
+import org.apache.gravitino.utils.NamespaceUtil;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.gravitino.utils.ThrowableFunction;
import org.slf4j.Logger;
@@ -740,36 +741,31 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
() -> {
checkMetalake(metalakeIdent, store);
try {
- boolean catalogInUse = catalogInUse(store, ident);
+ boolean catalogInUse = getCatalogInUseValue(store, ident);
if (catalogInUse && !force) {
throw new CatalogInUseException(
"Catalog %s is in use, please disable it first or use force
option", ident);
}
- Namespace schemaNamespace =
Namespace.of(ident.namespace().level(0), ident.name());
- CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
-
+ Namespace schemaNs = Namespace.of(ident.namespace().level(0),
ident.name());
List<SchemaEntity> schemaEntities =
- store.list(schemaNamespace, SchemaEntity.class,
EntityType.SCHEMA);
- CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG,
CatalogEntity.class);
+ store.list(schemaNs, SchemaEntity.class, EntityType.SCHEMA);
- if (!force
- && containsUserCreatedSchemas(schemaEntities, catalogEntity,
catalogWrapper)) {
+ CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
+ if (!force && containsUserCreatedSchemas(schemaEntities,
catalogWrapper)) {
throw new NonEmptyCatalogException(
"Catalog %s has schemas, please drop them first or use force
option", ident);
}
- if (includeManagedEntities(catalogEntity)) {
- // code reach here in two cases:
- // 1. the catalog does not have available schemas
- // 2. the catalog has available schemas, and force is true
- // for case 1, the forEach block can drop them without any side
effect
- // for case 2, the forEach block will drop all managed
sub-entities
+ if (isManagedStorageCatalog(catalogWrapper)) {
+ // For managed catalog, we need to call drop schema API to drop
the underlying
+ // entities as well as the related resource first. Directly
deleting the metadata from
+ // the store is not enough.
schemaEntities.forEach(
schema -> {
try {
catalogWrapper.doWithSchemaOps(
- schemaOps ->
schemaOps.dropSchema(schema.nameIdentifier(), true));
+ ops -> ops.dropSchema(schema.nameIdentifier(),
true));
} catch (Exception e) {
LOG.warn("Failed to drop schema {}",
schema.nameIdentifier());
throw new RuntimeException(
@@ -777,6 +773,8 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
}
});
}
+
+ // Finally, delete the catalog entity as well as all its
sub-entities from the store.
catalogCache.invalidate(ident);
return store.delete(ident, EntityType.CATALOG, true);
@@ -805,28 +803,33 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
* </ul>
*
* @param schemaEntities The list of schema entities to check.
- * @param catalogEntity The catalog entity to which the schemas belong.
* @param catalogWrapper The catalog wrapper for the catalog.
* @return True if the list of schema entities contains any valid
user-created schemas, false
* otherwise.
* @throws Exception If an error occurs while checking the schemas.
*/
private boolean containsUserCreatedSchemas(
- List<SchemaEntity> schemaEntities, CatalogEntity catalogEntity,
CatalogWrapper catalogWrapper)
- throws Exception {
+ List<SchemaEntity> schemaEntities, CatalogWrapper catalogWrapper) throws
Exception {
if (schemaEntities.isEmpty()) {
return false;
}
+ if (isManagedStorageCatalog(catalogWrapper)) {
+ // For managed storage catalog, any existing schema entities are
considered user-created. At
+ // this point we already know schemaEntities is not empty, so we can
return true directly
+ // without further checks.
+ return true;
+ }
+
if (schemaEntities.size() == 1) {
- if ("kafka".equals(catalogEntity.getProvider())) {
+ String provider = catalogWrapper.catalog().provider();
+ if ("kafka".equalsIgnoreCase(provider)) {
return false;
-
- } else if ("jdbc-postgresql".equals(catalogEntity.getProvider())) {
+ } else if ("jdbc-postgresql".equalsIgnoreCase(provider)) {
// PostgreSQL catalog includes the "public" schema, see
// https://github.com/apache/gravitino/issues/2314
return !schemaEntities.get(0).name().equals("public");
- } else if ("hive".equals(catalogEntity.getProvider())) {
+ } else if ("hive".equalsIgnoreCase(provider)) {
return !schemaEntities.get(0).name().equals("default");
}
}
@@ -835,7 +838,9 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
catalogWrapper.doWithSchemaOps(
schemaOps ->
schemaOps.listSchemas(
- Namespace.of(catalogEntity.namespace().level(0),
catalogEntity.name())));
+ NamespaceUtil.ofSchema(
+ catalogWrapper.catalog().entity().namespace().level(0),
+ catalogWrapper.catalog().name())));
if (allSchemas.length == 0) {
return false;
}
@@ -848,10 +853,6 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
return
schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
}
- private boolean includeManagedEntities(CatalogEntity catalogEntity) {
- return catalogEntity.getType().equals(FILESET);
- }
-
/**
* Loads the catalog with the specified identifier, wraps it in a
CatalogWrapper, and caches the
* wrapper for reuse.
@@ -893,6 +894,19 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
}
}
+ private boolean isManagedStorageCatalog(CatalogWrapper catalogWrapper) {
+ try {
+ Capability capability = catalogWrapper.capabilities();
+ return capability.managedStorage(Capability.Scope.SCHEMA).supported()
+ && (capability.managedStorage(Capability.Scope.TABLE).supported()
+ ||
capability.managedStorage(Capability.Scope.FILESET).supported()
+ ||
capability.managedStorage(Capability.Scope.MODEL).supported());
+ } catch (Exception e) {
+ // This should not be happened, because capabilities() will never throw
an exception here.
+ throw new RuntimeException(e);
+ }
+ }
+
private CatalogEntity.Builder newCatalogBuilder(Namespace namespace,
CatalogEntity catalog) {
CatalogEntity.Builder builder =
CatalogEntity.builder()
diff --git
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
index 37d517f2d2..3b33b8f3e2 100644
---
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
+++
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
@@ -83,7 +83,10 @@ public interface Capability {
}
/**
- * Check if the entity is fully managed by Gravitino in the scope.
+ * Check if the metadata entity is fully managed by Gravitino for the
passed-in scope. This is
+ * used to determine whether Gravitino should manage the lifecycle of the
metadata and related
+ * data. For example, if a catalog storage is managed, when the catalog is
dropped, Gravitino will
+ * also delete all underlying data associated with the catalog.
*
* @param scope The scope of the capability.
* @return The capability of the managed storage.
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
index b9cede7473..8a4f728af2 100644
--- a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
+++ b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
@@ -43,6 +43,8 @@ import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
import org.apache.gravitino.exceptions.CatalogInUseException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -598,7 +600,11 @@ public class TestCatalogManager {
entityStore.put(schemaEntity);
CatalogManager.CatalogWrapper catalogWrapper =
Mockito.mock(CatalogManager.CatalogWrapper.class);
+ Capability capability = Mockito.mock(Capability.class);
+ CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not
managed");
Mockito.doReturn(catalogWrapper).when(catalogManager).loadCatalogAndWrap(ident);
+ Mockito.doReturn(capability).when(catalogWrapper).capabilities();
+ Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
Mockito.doThrow(new RuntimeException("Failed connect"))
.when(catalogWrapper)
.doWithSchemaOps(any());