This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 46ae50ddbe [#9527] feat(core): Add UDF management framework (#9553)
46ae50ddbe is described below

commit 46ae50ddbebc263fba83acbe1f2b92548699ac7e
Author: mchades <[email protected]>
AuthorDate: Tue Jan 6 16:03:46 2026 +0800

    [#9527] feat(core): Add UDF management framework (#9553)
    
    ### What changes were proposed in this pull request?
    
    Add UDF management framework
    
    ### Why are the changes needed?
    
    Fix: #9527
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    CI pass
---
 .../java/org/apache/gravitino/GravitinoEnv.java    |  25 ++
 .../gravitino/catalog/CapabilityHelpers.java       |   6 +-
 .../gravitino/catalog/FunctionDispatcher.java      |  29 +++
 .../catalog/FunctionNormalizeDispatcher.java       | 145 ++++++++++++
 .../catalog/FunctionOperationDispatcher.java       | 254 +++++++++++++++++++++
 .../catalog/ManagedFunctionOperations.java         | 137 +++++++++++
 .../gravitino/connector/capability/Capability.java |   7 +-
 7 files changed, 600 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index a4403c8126..a9980df00a 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -35,6 +35,9 @@ import 
org.apache.gravitino.catalog.CatalogNormalizeDispatcher;
 import org.apache.gravitino.catalog.FilesetDispatcher;
 import org.apache.gravitino.catalog.FilesetNormalizeDispatcher;
 import org.apache.gravitino.catalog.FilesetOperationDispatcher;
+import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.catalog.FunctionNormalizeDispatcher;
+import org.apache.gravitino.catalog.FunctionOperationDispatcher;
 import org.apache.gravitino.catalog.ModelDispatcher;
 import org.apache.gravitino.catalog.ModelNormalizeDispatcher;
 import org.apache.gravitino.catalog.ModelOperationDispatcher;
@@ -130,6 +133,8 @@ public class GravitinoEnv {
 
   private ModelDispatcher modelDispatcher;
 
+  private FunctionDispatcher functionDispatcher;
+
   private MetalakeDispatcher metalakeDispatcher;
 
   private CredentialOperationDispatcher credentialOperationDispatcher;
@@ -257,6 +262,15 @@ public class GravitinoEnv {
     return modelDispatcher;
   }
 
+  /**
+   * Get the FunctionDispatcher associated with the Gravitino environment.
+   *
+   * @return The FunctionDispatcher instance.
+   */
+  public FunctionDispatcher functionDispatcher() {
+    return functionDispatcher;
+  }
+
   /**
    * Get the PartitionDispatcher associated with the Gravitino environment.
    *
@@ -623,6 +637,17 @@ public class GravitinoEnv {
     ModelNormalizeDispatcher modelNormalizeDispatcher =
         new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
     this.modelDispatcher = new ModelEventDispatcher(eventBus, 
modelNormalizeDispatcher);
+
+    // TODO: Add FunctionHookDispatcher and FunctionEventDispatcher when needed
+    // The operation chain should be:
+    // FunctionEventDispatcher -> FunctionNormalizeDispatcher -> 
FunctionHookDispatcher ->
+    // FunctionOperationDispatcher
+    FunctionOperationDispatcher functionOperationDispatcher =
+        new FunctionOperationDispatcher(
+            catalogManager, schemaOperationDispatcher, entityStore, 
idGenerator);
+    this.functionDispatcher =
+        new FunctionNormalizeDispatcher(functionOperationDispatcher, 
catalogManager);
+
     this.statisticDispatcher =
         new StatisticEventDispatcher(
             eventBus, new StatisticManager(entityStore, idGenerator, config));
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java 
b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
index fe4659c28e..e405cfdd92 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
@@ -131,7 +131,8 @@ public class CapabilityHelpers {
     if (identScope == Capability.Scope.TABLE
         || identScope == Capability.Scope.FILESET
         || identScope == Capability.Scope.TOPIC
-        || identScope == Capability.Scope.MODEL) {
+        || identScope == Capability.Scope.MODEL
+        || identScope == Capability.Scope.FUNCTION) {
       String schema = namespace.level(namespace.length() - 1);
       schema = applyCaseSensitiveOnName(Capability.Scope.SCHEMA, schema, 
capabilities);
       return Namespace.of(metalake, catalog, schema);
@@ -201,7 +202,8 @@ public class CapabilityHelpers {
     String catalog = namespace.level(1);
     if (identScope == Capability.Scope.TABLE
         || identScope == Capability.Scope.FILESET
-        || identScope == Capability.Scope.TOPIC) {
+        || identScope == Capability.Scope.TOPIC
+        || identScope == Capability.Scope.FUNCTION) {
       String schema = namespace.level(namespace.length() - 1);
       schema = applyCapabilitiesOnName(Capability.Scope.SCHEMA, schema, 
capabilities);
       return Namespace.of(metalake, catalog, schema);
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FunctionDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/FunctionDispatcher.java
new file mode 100644
index 0000000000..65c23c920c
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/catalog/FunctionDispatcher.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import org.apache.gravitino.function.FunctionCatalog;
+
+/**
+ * {@code FunctionDispatcher} interface acts as a specialization of the {@link 
FunctionCatalog}
+ * interface. This interface is designed to potentially add custom behaviors 
or operations related
+ * to dispatching or handling function-related events or actions that are not 
covered by the
+ * standard {@code FunctionCatalog} operations.
+ */
+public interface FunctionDispatcher extends FunctionCatalog {}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FunctionNormalizeDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FunctionNormalizeDispatcher.java
new file mode 100644
index 0000000000..4930a780aa
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FunctionNormalizeDispatcher.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
+import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Type;
+
+/**
+ * {@code FunctionNormalizeDispatcher} normalizes function identifiers and 
namespaces by applying
+ * case-sensitivity and other naming capabilities before delegating to the 
underlying dispatcher.
+ */
+public class FunctionNormalizeDispatcher implements FunctionDispatcher {
+  private final CatalogManager catalogManager;
+  private final FunctionDispatcher dispatcher;
+
+  public FunctionNormalizeDispatcher(FunctionDispatcher dispatcher, 
CatalogManager catalogManager) {
+    this.dispatcher = dispatcher;
+    this.catalogManager = catalogManager;
+  }
+
+  @Override
+  public NameIdentifier[] listFunctions(Namespace namespace) throws 
NoSuchSchemaException {
+    Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
+    NameIdentifier[] identifiers = dispatcher.listFunctions(caseSensitiveNs);
+    return normalizeCaseSensitive(identifiers);
+  }
+
+  @Override
+  public Function[] listFunctionInfos(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listFunctionInfos(normalizeCaseSensitive(namespace));
+  }
+
+  @Override
+  public Function getFunction(NameIdentifier ident) throws 
NoSuchFunctionException {
+    return dispatcher.getFunction(normalizeCaseSensitive(ident));
+  }
+
+  @Override
+  public Function getFunction(NameIdentifier ident, int version)
+      throws NoSuchFunctionException, NoSuchFunctionVersionException {
+    return dispatcher.getFunction(normalizeCaseSensitive(ident), version);
+  }
+
+  @Override
+  public boolean functionExists(NameIdentifier ident) {
+    return dispatcher.functionExists(normalizeCaseSensitive(ident));
+  }
+
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      Type returnType,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    return dispatcher.registerFunction(
+        normalizeNameIdentifier(ident),
+        comment,
+        functionType,
+        deterministic,
+        returnType,
+        definitions);
+  }
+
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      boolean deterministic,
+      FunctionColumn[] returnColumns,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    return dispatcher.registerFunction(
+        normalizeNameIdentifier(ident), comment, deterministic, returnColumns, 
definitions);
+  }
+
+  @Override
+  public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes)
+      throws NoSuchFunctionException, IllegalArgumentException {
+    return dispatcher.alterFunction(normalizeCaseSensitive(ident), changes);
+  }
+
+  @Override
+  public boolean dropFunction(NameIdentifier ident) {
+    return dispatcher.dropFunction(normalizeCaseSensitive(ident));
+  }
+
+  private Namespace normalizeCaseSensitive(Namespace namespace) {
+    Capability capabilities = 
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+    return applyCaseSensitive(namespace, Capability.Scope.FUNCTION, 
capabilities);
+  }
+
+  private NameIdentifier normalizeCaseSensitive(NameIdentifier functionIdent) {
+    Capability capabilities = getCapability(functionIdent, catalogManager);
+    return applyCaseSensitive(functionIdent, Capability.Scope.FUNCTION, 
capabilities);
+  }
+
+  private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[] 
functionIdents) {
+    if (ArrayUtils.isEmpty(functionIdents)) {
+      return functionIdents;
+    }
+
+    Capability capabilities = getCapability(functionIdents[0], catalogManager);
+    return applyCaseSensitive(functionIdents, Capability.Scope.FUNCTION, 
capabilities);
+  }
+
+  private NameIdentifier normalizeNameIdentifier(NameIdentifier functionIdent) 
{
+    Capability capability = getCapability(functionIdent, catalogManager);
+    return applyCapabilities(functionIdent, Capability.Scope.FUNCTION, 
capability);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
new file mode 100644
index 0000000000..2392053f69
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * {@code FunctionOperationDispatcher} is responsible for dispatching 
function-related operations.
+ *
+ * <p>Unlike ModelCatalog which manages its own schemas (via 
ManagedSchemaOperations), functions are
+ * registered under schemas managed by the underlying catalog (e.g., Hive, 
Iceberg). Therefore, this
+ * dispatcher validates schema existence by calling the underlying catalog's 
schema operations, then
+ * delegates actual storage operations to {@link ManagedFunctionOperations}.
+ */
+public class FunctionOperationDispatcher extends OperationDispatcher 
implements FunctionDispatcher {
+
+  private final SchemaOperationDispatcher schemaOps;
+  private final ManagedFunctionOperations managedFunctionOps;
+
+  /**
+   * Creates a new FunctionOperationDispatcher instance.
+   *
+   * @param catalogManager The CatalogManager instance to be used for function 
operations.
+   * @param store The EntityStore instance to be used for function operations.
+   * @param idGenerator The IdGenerator instance to be used for function 
operations.
+   */
+  public FunctionOperationDispatcher(
+      CatalogManager catalogManager,
+      SchemaOperationDispatcher schemaOps,
+      EntityStore store,
+      IdGenerator idGenerator) {
+    super(catalogManager, store, idGenerator);
+    this.schemaOps = schemaOps;
+    this.managedFunctionOps = new ManagedFunctionOperations(store, 
idGenerator);
+  }
+
+  /**
+   * List the functions in a namespace from the catalog.
+   *
+   * @param namespace A namespace.
+   * @return An array of function identifiers in the namespace.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   */
+  @Override
+  public NameIdentifier[] listFunctions(Namespace namespace) throws 
NoSuchSchemaException {
+    return Arrays.stream(listFunctionInfos(namespace))
+        .map(f -> NameIdentifier.of(namespace, f.name()))
+        .toArray(NameIdentifier[]::new);
+  }
+
+  @Override
+  public Function[] listFunctionInfos(Namespace namespace) throws 
NoSuchSchemaException {
+    NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
+    // Validate schema exists in the underlying catalog
+    schemaOps.loadSchema(schemaIdent);
+
+    return TreeLockUtils.doWithTreeLock(
+        schemaIdent, LockType.READ, () -> 
managedFunctionOps.listFunctionInfos(namespace));
+  }
+
+  /**
+   * Get a function by {@link NameIdentifier} from the catalog. Returns the 
latest version.
+   *
+   * @param ident A function identifier.
+   * @return The latest version of the function with the given name.
+   * @throws NoSuchFunctionException If the function does not exist.
+   */
+  @Override
+  public Function getFunction(NameIdentifier ident) throws 
NoSuchFunctionException {
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    // Validate schema exists in the underlying catalog
+    if (!schemaOps.schemaExists(schemaIdent)) {
+      throw new NoSuchFunctionException("Schema does not exist: %s", 
schemaIdent);
+    }
+
+    return TreeLockUtils.doWithTreeLock(
+        ident, LockType.READ, () -> managedFunctionOps.getFunction(ident));
+  }
+
+  /**
+   * Get a function by {@link NameIdentifier} and version from the catalog.
+   *
+   * @param ident A function identifier.
+   * @param version The function version, counted from 0.
+   * @return The function with the given name and version.
+   * @throws NoSuchFunctionException If the function does not exist.
+   * @throws NoSuchFunctionVersionException If the function version does not 
exist.
+   */
+  @Override
+  public Function getFunction(NameIdentifier ident, int version)
+      throws NoSuchFunctionException, NoSuchFunctionVersionException {
+    Preconditions.checkArgument(version >= 0, "Function version must be 
non-negative");
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    // Validate schema exists in the underlying catalog
+    if (!schemaOps.schemaExists(schemaIdent)) {
+      throw new NoSuchFunctionException("Schema does not exist: %s", 
schemaIdent);
+    }
+
+    return TreeLockUtils.doWithTreeLock(
+        ident, LockType.READ, () -> managedFunctionOps.getFunction(ident, 
version));
+  }
+
+  /**
+   * Register a scalar or aggregate function with one or more definitions 
(overloads).
+   *
+   * @param ident The function identifier.
+   * @param comment The optional function comment.
+   * @param functionType The function type.
+   * @param deterministic Whether the function is deterministic.
+   * @param returnType The return type.
+   * @param definitions The function definitions.
+   * @return The registered function.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   * @throws FunctionAlreadyExistsException If the function already exists.
+   */
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      Type returnType,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    Preconditions.checkArgument(
+        functionType == FunctionType.SCALAR || functionType == 
FunctionType.AGGREGATE,
+        "This method is for scalar or aggregate functions only");
+    Preconditions.checkArgument(returnType != null, "Return type is required");
+    Preconditions.checkArgument(
+        definitions != null && definitions.length > 0, "At least one 
definition is required");
+
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    // Validate schema exists in the underlying catalog
+    schemaOps.loadSchema(schemaIdent);
+
+    return TreeLockUtils.doWithTreeLock(
+        ident,
+        LockType.WRITE,
+        () ->
+            managedFunctionOps.registerFunction(
+                ident, comment, functionType, deterministic, returnType, 
definitions));
+  }
+
+  /**
+   * Register a table-valued function with one or more definitions (overloads).
+   *
+   * @param ident The function identifier.
+   * @param comment The optional function comment.
+   * @param deterministic Whether the function is deterministic.
+   * @param returnColumns The return columns.
+   * @param definitions The function definitions.
+   * @return The registered function.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   * @throws FunctionAlreadyExistsException If the function already exists.
+   */
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      boolean deterministic,
+      FunctionColumn[] returnColumns,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    Preconditions.checkArgument(
+        returnColumns != null && returnColumns.length > 0,
+        "At least one return column is required for table-valued function");
+    Preconditions.checkArgument(
+        definitions != null && definitions.length > 0, "At least one 
definition is required");
+
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    // Validate schema exists in the underlying catalog
+    schemaOps.loadSchema(schemaIdent);
+
+    return TreeLockUtils.doWithTreeLock(
+        ident,
+        LockType.WRITE,
+        () ->
+            managedFunctionOps.registerFunction(
+                ident, comment, deterministic, returnColumns, definitions));
+  }
+
+  /**
+   * Applies {@link FunctionChange changes} to a function in the catalog.
+   *
+   * @param ident the {@link NameIdentifier} instance of the function to alter.
+   * @param changes the several {@link FunctionChange} instances to apply to 
the function.
+   * @return the updated {@link Function} instance.
+   * @throws NoSuchFunctionException If the function does not exist.
+   * @throws IllegalArgumentException If the change is rejected by the 
implementation.
+   */
+  @Override
+  public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes)
+      throws NoSuchFunctionException, IllegalArgumentException {
+    Preconditions.checkArgument(
+        changes != null && changes.length > 0, "At least one change is 
required");
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    if (!schemaOps.schemaExists(schemaIdent)) {
+      throw new NoSuchFunctionException("Schema does not exist: %s", 
schemaIdent);
+    }
+
+    return TreeLockUtils.doWithTreeLock(
+        ident, LockType.WRITE, () -> managedFunctionOps.alterFunction(ident, 
changes));
+  }
+
+  /**
+   * Drop a function by name.
+   *
+   * @param ident The name identifier of the function.
+   * @return True if the function is deleted, false if the function does not 
exist.
+   */
+  @Override
+  public boolean dropFunction(NameIdentifier ident) {
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    if (!schemaOps.schemaExists(schemaIdent)) {
+      return false;
+    }
+
+    return TreeLockUtils.doWithTreeLock(
+        ident, LockType.WRITE, () -> managedFunctionOps.dropFunction(ident));
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
new file mode 100644
index 0000000000..f080f6ee8b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * {@code ManagedFunctionOperations} provides the storage-level operations for 
managing functions in
+ * Gravitino's EntityStore.
+ *
+ * <p>This class handles the actual persistence of function metadata, 
including:
+ *
+ * <ul>
+ *   <li>Storing function entities and their versions
+ *   <li>Retrieving functions by identifier or version
+ *   <li>Updating function metadata
+ *   <li>Deleting functions and their versions
+ * </ul>
+ */
+public class ManagedFunctionOperations implements FunctionCatalog {
+
+  @SuppressWarnings("UnusedVariable")
+  private static final int INIT_VERSION = 0;
+
+  @SuppressWarnings("UnusedVariable")
+  private final EntityStore store;
+
+  @SuppressWarnings("UnusedVariable")
+  private final IdGenerator idGenerator;
+
+  /**
+   * Creates a new ManagedFunctionOperations instance.
+   *
+   * @param store The EntityStore instance for function persistence.
+   * @param idGenerator The IdGenerator instance for generating unique IDs.
+   */
+  public ManagedFunctionOperations(EntityStore store, IdGenerator idGenerator) 
{
+    this.store = store;
+    this.idGenerator = idGenerator;
+  }
+
+  @Override
+  public NameIdentifier[] listFunctions(Namespace namespace) throws 
NoSuchSchemaException {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException("listFunctions: FunctionEntity not 
yet implemented");
+  }
+
+  @Override
+  public Function[] listFunctionInfos(Namespace namespace) throws 
NoSuchSchemaException {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException(
+        "listFunctionInfos: FunctionEntity not yet implemented");
+  }
+
+  @Override
+  public Function getFunction(NameIdentifier ident) throws 
NoSuchFunctionException {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException("getFunction: FunctionEntity not 
yet implemented");
+  }
+
+  @Override
+  public Function getFunction(NameIdentifier ident, int version)
+      throws NoSuchFunctionException, NoSuchFunctionVersionException {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException(
+        "getFunction with version: FunctionEntity not yet implemented");
+  }
+
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      Type returnType,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException("registerFunction: FunctionEntity 
not yet implemented");
+  }
+
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      boolean deterministic,
+      FunctionColumn[] returnColumns,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException(
+        "registerFunction for table-valued functions: FunctionEntity not yet 
implemented");
+  }
+
+  @Override
+  public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes)
+      throws NoSuchFunctionException, IllegalArgumentException {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException("alterFunction: FunctionEntity not 
yet implemented");
+  }
+
+  @Override
+  public boolean dropFunction(NameIdentifier ident) {
+    // TODO: Implement when FunctionEntity is available
+    throw new UnsupportedOperationException("dropFunction: FunctionEntity not 
yet implemented");
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java 
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
index 3b33b8f3e2..784946c59e 100644
--- 
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
+++ 
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
@@ -40,7 +40,8 @@ public interface Capability {
     FILESET,
     TOPIC,
     PARTITION,
-    MODEL
+    MODEL,
+    FUNCTION
   }
 
   /**
@@ -144,6 +145,10 @@ public interface Capability {
 
     @Override
     public CapabilityResult managedStorage(Scope scope) {
+      if (scope == Scope.FUNCTION) {
+        return CapabilityResult.SUPPORTED;
+      }
+
       return CapabilityResult.unsupported(
           String.format("The %s entity is not fully managed by Gravitino.", 
scope));
     }

Reply via email to