This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new d85a9b482 [#4020] feat(core): Add Tag Manage core logic to support tag 
operations (part-2) (#4109)
d85a9b482 is described below

commit d85a9b4825f75e075daf74fe4109831dbe1bf373
Author: Jerry Shao <jerrys...@datastrato.com>
AuthorDate: Tue Jul 16 20:53:04 2024 +0800

    [#4020] feat(core): Add Tag Manage core logic to support tag operations 
(part-2) (#4109)
    
    ### What changes were proposed in this pull request?
    
    This PR add the second part tag core logic to support associate tags
    with metadata object, and query the associations between tags and
    metadata objects.
    
    ### Why are the changes needed?
    
    This is a part of work to support tag system.
    
    Fix: #4020
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO.
    
    ### How was this patch tested?
    
    Add UTs to cover the logic.
---
 .../exceptions/TagAlreadyAssociatedException.java  |  49 +++
 .../java/org/apache/gravitino/EntityStore.java     |  11 +
 .../gravitino/storage/relational/JDBCBackend.java  |  32 ++
 .../storage/relational/RelationalBackend.java      |   3 +-
 .../storage/relational/RelationalEntityStore.java  |  41 ++-
 .../storage/relational/mapper/TagMetaMapper.java   |  33 +-
 .../mapper/TagMetadataObjectRelMapper.java         | 112 ++++++-
 .../relational/po/TagMetadataObjectRelPO.java      | 130 ++++++++
 .../gravitino/storage/relational/po/TagPO.java     |  21 +-
 .../gravitino/storage/relational/po/TopicPO.java   |  26 +-
 .../MetadataObjectService.java}                    |  14 +-
 .../relational/service/RoleMetaService.java        |   6 +-
 .../storage/relational/service/TagMetaService.java | 198 +++++++++++
 .../storage/relational/utils/POConverters.java     |  26 ++
 .../gravitino/tag/SupportsTagOperations.java       |  96 ++++++
 .../java/org/apache/gravitino/tag/TagManager.java  | 210 +++++++++++-
 .../apache/gravitino/utils/MetadataObjectUtil.java |  97 ++++++
 .../apache/gravitino/utils/NameIdentifierUtil.java |  55 ++++
 .../relational/service/TestTagMetaService.java     | 361 +++++++++++++++++++++
 .../storage/relational/utils/TestPOConverters.java |  17 +
 .../org/apache/gravitino/tag/TestTagManager.java   | 355 +++++++++++++++++++-
 .../gravitino/utils/TestMetadataObjectUtil.java    | 124 +++++++
 .../gravitino/utils/TestNameIdentifierUtil.java    |  66 ++++
 scripts/h2/schema-h2.sql                           |   2 +-
 scripts/mysql/schema-0.6.0-mysql.sql               |   2 +-
 scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql     |   2 +-
 26 files changed, 2024 insertions(+), 65 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/exceptions/TagAlreadyAssociatedException.java
 
b/api/src/main/java/org/apache/gravitino/exceptions/TagAlreadyAssociatedException.java
new file mode 100644
index 000000000..61cab11fb
--- /dev/null
+++ 
b/api/src/main/java/org/apache/gravitino/exceptions/TagAlreadyAssociatedException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+
+/** Exception thrown when a tag with specified name already associated to a 
metadata object. */
+public class TagAlreadyAssociatedException extends AlreadyExistsException {
+
+  /**
+   * Constructs a new exception with the specified detail message.
+   *
+   * @param message the detail message.
+   * @param args the arguments to the message.
+   */
+  @FormatMethod
+  public TagAlreadyAssociatedException(String message, Object... args) {
+    super(message, args);
+  }
+
+  /**
+   * Constructs a new exception with the specified detail message and cause.
+   *
+   * @param cause the cause.
+   * @param message the detail message.
+   * @param args the arguments to the message.
+   */
+  @FormatMethod
+  public TagAlreadyAssociatedException(Throwable cause, String message, 
Object... args) {
+    super(cause, message, args);
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/EntityStore.java 
b/core/src/main/java/org/apache/gravitino/EntityStore.java
index ec9eab8d4..3693c5bbf 100644
--- a/core/src/main/java/org/apache/gravitino/EntityStore.java
+++ b/core/src/main/java/org/apache/gravitino/EntityStore.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.function.Function;
 import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.tag.SupportsTagOperations;
 import org.apache.gravitino.utils.Executable;
 
 public interface EntityStore extends Closeable {
@@ -183,4 +184,14 @@ public interface EntityStore extends Closeable {
    */
   <R, E extends Exception> R executeInTransaction(Executable<R, E> executable)
       throws E, IOException;
+
+  /**
+   * Get the extra tag operations that are supported by the entity store.
+   *
+   * @return the tag operations object that are supported by the entity store
+   * @throws UnsupportedOperationException if the extra operations are not 
supported
+   */
+  default SupportsTagOperations tagOperations() {
+    throw new UnsupportedOperationException("tag operations are not 
supported");
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 430fb5a55..b635c3887 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -31,6 +31,7 @@ import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.UnsupportedEntityTypeException;
@@ -328,6 +329,37 @@ public class JDBCBackend implements RelationalBackend {
     }
   }
 
+  @Override
+  public List<MetadataObject> 
listAssociatedMetadataObjectsForTag(NameIdentifier tagIdent)
+      throws IOException {
+    return 
TagMetaService.getInstance().listAssociatedMetadataObjectsForTag(tagIdent);
+  }
+
+  @Override
+  public List<TagEntity> listAssociatedTagsForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType)
+      throws NoSuchEntityException, IOException {
+    return TagMetaService.getInstance().listTagsForMetadataObject(objectIdent, 
objectType);
+  }
+
+  @Override
+  public TagEntity getTagForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType, NameIdentifier 
tagIdent)
+      throws NoSuchEntityException, IOException {
+    return TagMetaService.getInstance().getTagForMetadataObject(objectIdent, 
objectType, tagIdent);
+  }
+
+  @Override
+  public List<TagEntity> associateTagsWithMetadataObject(
+      NameIdentifier objectIdent,
+      Entity.EntityType objectType,
+      NameIdentifier[] tagsToAdd,
+      NameIdentifier[] tagsToRemove)
+      throws NoSuchEntityException, EntityAlreadyExistsException, IOException {
+    return TagMetaService.getInstance()
+        .associateTagsWithMetadataObject(objectIdent, objectType, tagsToAdd, 
tagsToRemove);
+  }
+
   enum JDBCBackendType {
     H2(true),
     MYSQL(false);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
index 5c81bd56c..4521a892f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
@@ -29,9 +29,10 @@ import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.tag.SupportsTagOperations;
 
 /** Interface defining the operations for a Relation Backend. */
-public interface RelationalBackend extends Closeable {
+public interface RelationalBackend extends Closeable, SupportsTagOperations {
 
   /**
    * Initializes the Relational Backend environment with the provided 
configuration.
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 25a4290cc..9d5c0b21c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -31,9 +31,12 @@ import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntitySerDe;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.tag.SupportsTagOperations;
 import org.apache.gravitino.utils.Executable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * MySQL, PostgreSQL, etc. If you want to use a different backend, you can 
implement the {@link
  * RelationalBackend} interface
  */
-public class RelationalEntityStore implements EntityStore {
+public class RelationalEntityStore implements EntityStore, 
SupportsTagOperations {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RelationalEntityStore.class);
   public static final ImmutableMap<String, String> RELATIONAL_BACKENDS =
       ImmutableMap.of(
@@ -132,4 +135,40 @@ public class RelationalEntityStore implements EntityStore {
     garbageCollector.close();
     backend.close();
   }
+
+  @Override
+  public SupportsTagOperations tagOperations() {
+    return this;
+  }
+
+  @Override
+  public List<MetadataObject> 
listAssociatedMetadataObjectsForTag(NameIdentifier tagIdent)
+      throws IOException {
+    return backend.listAssociatedMetadataObjectsForTag(tagIdent);
+  }
+
+  @Override
+  public List<TagEntity> listAssociatedTagsForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType)
+      throws NoSuchEntityException, IOException {
+    return backend.listAssociatedTagsForMetadataObject(objectIdent, 
objectType);
+  }
+
+  @Override
+  public TagEntity getTagForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType, NameIdentifier 
tagIdent)
+      throws NoSuchEntityException, IOException {
+    return backend.getTagForMetadataObject(objectIdent, objectType, tagIdent);
+  }
+
+  @Override
+  public List<TagEntity> associateTagsWithMetadataObject(
+      NameIdentifier objectIdent,
+      Entity.EntityType objectType,
+      NameIdentifier[] tagsToAdd,
+      NameIdentifier[] tagsToRemove)
+      throws NoSuchEntityException, EntityAlreadyExistsException, IOException {
+    return backend.associateTagsWithMetadataObject(
+        objectIdent, objectType, tagsToAdd, tagsToRemove);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
index 44149fc5b..7b52d34a1 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetaMapper.java
@@ -31,7 +31,7 @@ public interface TagMetaMapper {
   String TAG_TABLE_NAME = "tag_meta";
 
   @Select(
-      "SELECT tm.tag_id as tagId, tag_name as tagName,"
+      "SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
           + " tm.metalake_id as metalakeId,"
           + " tm.tag_comment as comment,"
           + " tm.properties as properties,"
@@ -43,16 +43,41 @@ public interface TagMetaMapper {
           + TAG_TABLE_NAME
           + " tm JOIN "
           + MetalakeMetaMapper.TABLE_NAME
-          + " mm on tm.metalake_id = mm.metalake_id"
+          + " mm ON tm.metalake_id = mm.metalake_id"
           + " WHERE mm.metalake_name = #{metalakeName} AND tm.deleted_at = 0 
AND mm.deleted_at = 0")
   List<TagPO> listTagPOsByMetalake(@Param("metalakeName") String metalakeName);
 
+  @Select(
+      "<script>"
+          + "SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+          + " tm.metalake_id as metalakeId,"
+          + " tm.tag_comment as comment,"
+          + " tm.properties as properties,"
+          + " tm.audit_info as auditInfo,"
+          + " tm.current_version as currentVersion,"
+          + " tm.last_version as lastVersion,"
+          + " tm.deleted_at as deletedAt"
+          + " FROM "
+          + TAG_TABLE_NAME
+          + " tm JOIN "
+          + MetalakeMetaMapper.TABLE_NAME
+          + " mm ON tm.metalake_id = mm.metalake_id"
+          + " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name IN "
+          + " <foreach"
+          + " item='tagName' index='index' collection='tagNames' open='(' 
separator=',' close=')'>"
+          + " #{tagName}"
+          + " </foreach>"
+          + " AND tm.deleted_at = 0 AND mm.deleted_at = 0"
+          + "</script>")
+  List<TagPO> listTagPOsByMetalakeAndTagNames(
+      @Param("metalakeName") String metalakeName, @Param("tagNames") 
List<String> tagNames);
+
   @Select(
       "SELECT tm.tag_id as tagId FROM "
           + TAG_TABLE_NAME
           + " tm JOIN "
           + MetalakeMetaMapper.TABLE_NAME
-          + " mm on tm.metalake_id = mm.metalake_id"
+          + " mm ON tm.metalake_id = mm.metalake_id"
           + " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = 
#{tagName}"
           + " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
   Long selectTagIdByMetalakeAndName(
@@ -71,7 +96,7 @@ public interface TagMetaMapper {
           + TAG_TABLE_NAME
           + " tm JOIN "
           + MetalakeMetaMapper.TABLE_NAME
-          + " mm on tm.metalake_id = mm.metalake_id"
+          + " mm ON tm.metalake_id = mm.metalake_id"
           + " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = 
#{tagName}"
           + " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
   TagPO selectTagMetaByMetalakeAndName(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelMapper.java
index a6f56971c..1ade27d8f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TagMetadataObjectRelMapper.java
@@ -18,36 +18,136 @@
  */
 package org.apache.gravitino.storage.relational.mapper;
 
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
+import org.apache.gravitino.storage.relational.po.TagPO;
 import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Insert;
 import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
 import org.apache.ibatis.annotations.Update;
 
 public interface TagMetadataObjectRelMapper {
   String TAG_METADATA_OBJECT_RELATION_TABLE_NAME = "tag_relation_meta";
 
+  @Select(
+      "SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+          + " tm.metalake_id as metalakeId, tm.tag_comment as comment, 
tm.properties as properties,"
+          + " tm.audit_info as auditInfo,"
+          + " tm.current_version as currentVersion,"
+          + " tm.last_version as lastVersion,"
+          + " tm.deleted_at as deletedAt"
+          + " FROM "
+          + TagMetaMapper.TAG_TABLE_NAME
+          + " tm JOIN "
+          + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+          + " te ON tm.tag_id = te.tag_id"
+          + " WHERE te.metadata_object_id = #{metadataObjectId}"
+          + " AND te.metadata_object_type = #{metadataObjectType} AND 
te.deleted_at = 0"
+          + " AND tm.deleted_at = 0")
+  List<TagPO> listTagPOsByMetadataObjectIdAndType(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType);
+
+  @Select(
+      "SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+          + " tm.metalake_id as metalakeId, tm.tag_comment as comment, 
tm.properties as properties,"
+          + " tm.audit_info as auditInfo,"
+          + " tm.current_version as currentVersion,"
+          + " tm.last_version as lastVersion,"
+          + " tm.deleted_at as deletedAt"
+          + " FROM "
+          + TagMetaMapper.TAG_TABLE_NAME
+          + " tm JOIN "
+          + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+          + " te ON tm.tag_id = te.tag_id"
+          + " WHERE te.metadata_object_id = #{metadataObjectId}"
+          + " AND te.metadata_object_type = #{metadataObjectType} AND 
tm.tag_name = #{tagName}"
+          + " AND te.deleted_at = 0 AND tm.deleted_at = 0")
+  TagPO getTagPOsByMetadataObjectAndTagName(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("tagName") String tagName);
+
+  @Select(
+      "SELECT te.tag_id as tagId, te.metadata_object_id as metadataObjectId,"
+          + " te.metadata_object_type as metadataObjectType, te.audit_info as 
auditInfo,"
+          + " te.current_version as currentVersion, te.last_version as 
lastVersion,"
+          + " te.deleted_at as deletedAt"
+          + " FROM "
+          + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+          + " te JOIN "
+          + TagMetaMapper.TAG_TABLE_NAME
+          + " tm JOIN "
+          + MetalakeMetaMapper.TABLE_NAME
+          + " mm ON te.tag_id = tm.tag_id AND tm.metalake_id = mm.metalake_id"
+          + " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = 
#{tagName}"
+          + " AND te.deleted_at = 0 AND tm.deleted_at = 0 AND mm.deleted_at = 
0")
+  List<TagMetadataObjectRelPO> listTagMetadataObjectRelsByMetalakeAndTagName(
+      @Param("metalakeName") String metalakeName, @Param("tagName") String 
tagName);
+
+  @Insert({
+    "<script>",
+    "INSERT INTO "
+        + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+        + "(tag_id, metadata_object_id, metadata_object_type, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES ",
+    "<foreach collection='tagRels' item='item' separator=','>",
+    "(#{item.tagId},"
+        + " #{item.metadataObjectId},"
+        + " #{item.metadataObjectType},"
+        + " #{item.auditInfo},"
+        + " #{item.currentVersion},"
+        + " #{item.lastVersion},"
+        + " #{item.deletedAt})",
+    "</foreach>",
+    "</script>"
+  })
+  void batchInsertTagMetadataObjectRels(@Param("tagRels") 
List<TagMetadataObjectRelPO> tagRelPOs);
+
+  @Update({
+    "<script>",
+    "UPDATE "
+        + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE tag_id IN ",
+    "<foreach item='tagId' collection='tagIds' open='(' separator=',' 
close=')'>",
+    "#{tagId}",
+    "</foreach>",
+    " And metadata_object_id = #{metadataObjectId}"
+        + " AND metadata_object_type = #{metadataObjectType} AND deleted_at = 
0",
+    "</script>"
+  })
+  void batchDeleteTagMetadataObjectRelsByTagIdsAndMetadataObject(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType,
+      @Param("tagIds") List<Long> tagIds);
+
   @Update(
       "UPDATE "
           + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
-          + " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+          + " te SET te.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
           + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
-          + " WHERE tmo.tag_id IN (SELECT tm.tag_id FROM "
+          + " WHERE te.tag_id IN (SELECT tm.tag_id FROM "
           + TagMetaMapper.TAG_TABLE_NAME
           + " tm WHERE tm.metalake_id IN (SELECT mm.metalake_id FROM "
           + MetalakeMetaMapper.TABLE_NAME
           + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 
0)"
-          + " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
+          + " AND tm.deleted_at = 0) AND te.deleted_at = 0")
   Integer softDeleteTagMetadataObjectRelsByMetalakeAndTagName(
       @Param("metalakeName") String metalakeName, @Param("tagName") String 
tagName);
 
   @Update(
       "UPDATE "
           + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
-          + " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+          + " te SET te.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
           + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
           + " WHERE EXISTS (SELECT * FROM "
           + TagMetaMapper.TAG_TABLE_NAME
-          + " tm WHERE tm.metalake_id = #{metalakeId} AND tm.tag_id = 
tmo.tag_id"
-          + " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
+          + " tm WHERE tm.metalake_id = #{metalakeId} AND tm.tag_id = 
te.tag_id"
+          + " AND tm.deleted_at = 0) AND te.deleted_at = 0")
   void softDeleteTagMetadataObjectRelsByMetalakeId(@Param("metalakeId") Long 
metalakeId);
 
   @Delete(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/TagMetadataObjectRelPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/TagMetadataObjectRelPO.java
new file mode 100644
index 000000000..e26b5cfb7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/TagMetadataObjectRelPO.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@Getter
+public class TagMetadataObjectRelPO {
+  private Long tagId;
+  private Long metadataObjectId;
+  private String metadataObjectType;
+  private String auditInfo;
+  private Long currentVersion;
+  private Long lastVersion;
+  private Long deletedAt;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TagMetadataObjectRelPO)) {
+      return false;
+    }
+    TagMetadataObjectRelPO tagRelPO = (TagMetadataObjectRelPO) o;
+    return Objects.equal(tagId, tagRelPO.tagId)
+        && Objects.equal(metadataObjectId, tagRelPO.metadataObjectId)
+        && Objects.equal(metadataObjectType, tagRelPO.metadataObjectType)
+        && Objects.equal(auditInfo, tagRelPO.auditInfo)
+        && Objects.equal(currentVersion, tagRelPO.currentVersion)
+        && Objects.equal(lastVersion, tagRelPO.lastVersion)
+        && Objects.equal(deletedAt, tagRelPO.deletedAt);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        tagId,
+        metadataObjectId,
+        metadataObjectType,
+        auditInfo,
+        currentVersion,
+        lastVersion,
+        deletedAt);
+  }
+
+  public static class Builder {
+    private final TagMetadataObjectRelPO tagRelPO;
+
+    private Builder() {
+      tagRelPO = new TagMetadataObjectRelPO();
+    }
+
+    public Builder withTagId(Long tagId) {
+      tagRelPO.tagId = tagId;
+      return this;
+    }
+
+    public Builder withMetadataObjectId(Long metadataObjectId) {
+      tagRelPO.metadataObjectId = metadataObjectId;
+      return this;
+    }
+
+    public Builder withMetadataObjectType(String metadataObjectType) {
+      tagRelPO.metadataObjectType = metadataObjectType;
+      return this;
+    }
+
+    public Builder withAuditInfo(String auditInfo) {
+      tagRelPO.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withCurrentVersion(Long currentVersion) {
+      tagRelPO.currentVersion = currentVersion;
+      return this;
+    }
+
+    public Builder withLastVersion(Long lastVersion) {
+      tagRelPO.lastVersion = lastVersion;
+      return this;
+    }
+
+    public Builder withDeletedAt(Long deletedAt) {
+      tagRelPO.deletedAt = deletedAt;
+      return this;
+    }
+
+    private void validate() {
+      Preconditions.checkArgument(tagRelPO.tagId != null, "Tag id is 
required");
+      Preconditions.checkArgument(
+          tagRelPO.metadataObjectId != null, "Metadata object id is required");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(tagRelPO.metadataObjectType),
+          "Metadata object type should not be empty");
+      Preconditions.checkArgument(tagRelPO.auditInfo != null, "Audit info is 
required");
+      Preconditions.checkArgument(tagRelPO.currentVersion != null, "Current 
version is required");
+      Preconditions.checkArgument(tagRelPO.lastVersion != null, "Last version 
is required");
+      Preconditions.checkArgument(tagRelPO.deletedAt != null, "Deleted at is 
required");
+    }
+
+    public TagMetadataObjectRelPO build() {
+      validate();
+      return tagRelPO;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/TagPO.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/TagPO.java
index c3e6656d6..1bf873fe1 100644
--- a/core/src/main/java/org/apache/gravitino/storage/relational/po/TagPO.java
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/TagPO.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.storage.relational.po;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
@@ -47,20 +48,20 @@ public class TagPO {
       return false;
     }
     TagPO tagPO = (TagPO) o;
-    return java.util.Objects.equals(tagId, tagPO.tagId)
-        && java.util.Objects.equals(tagName, tagPO.tagName)
-        && java.util.Objects.equals(metalakeId, tagPO.metalakeId)
-        && java.util.Objects.equals(comment, tagPO.comment)
-        && java.util.Objects.equals(properties, tagPO.properties)
-        && java.util.Objects.equals(auditInfo, tagPO.auditInfo)
-        && java.util.Objects.equals(currentVersion, tagPO.currentVersion)
-        && java.util.Objects.equals(lastVersion, tagPO.lastVersion)
-        && java.util.Objects.equals(deletedAt, tagPO.deletedAt);
+    return Objects.equal(tagId, tagPO.tagId)
+        && Objects.equal(tagName, tagPO.tagName)
+        && Objects.equal(metalakeId, tagPO.metalakeId)
+        && Objects.equal(comment, tagPO.comment)
+        && Objects.equal(properties, tagPO.properties)
+        && Objects.equal(auditInfo, tagPO.auditInfo)
+        && Objects.equal(currentVersion, tagPO.currentVersion)
+        && Objects.equal(lastVersion, tagPO.lastVersion)
+        && Objects.equal(deletedAt, tagPO.deletedAt);
   }
 
   @Override
   public int hashCode() {
-    return java.util.Objects.hash(
+    return Objects.hashCode(
         tagId,
         tagName,
         metalakeId,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/TopicPO.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/TopicPO.java
index 960ed52f6..1b78c86d1 100644
--- a/core/src/main/java/org/apache/gravitino/storage/relational/po/TopicPO.java
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/TopicPO.java
@@ -18,8 +18,8 @@
  */
 package org.apache.gravitino.storage.relational.po;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
-import java.util.Objects;
 import lombok.Getter;
 
 @Getter
@@ -49,22 +49,22 @@ public class TopicPO {
       return false;
     }
     TopicPO topicPO = (TopicPO) o;
-    return Objects.equals(topicId, topicPO.topicId)
-        && Objects.equals(topicName, topicPO.topicName)
-        && Objects.equals(metalakeId, topicPO.metalakeId)
-        && Objects.equals(catalogId, topicPO.catalogId)
-        && Objects.equals(schemaId, topicPO.schemaId)
-        && Objects.equals(comment, topicPO.comment)
-        && Objects.equals(properties, topicPO.properties)
-        && Objects.equals(auditInfo, topicPO.auditInfo)
-        && Objects.equals(currentVersion, topicPO.currentVersion)
-        && Objects.equals(lastVersion, topicPO.lastVersion)
-        && Objects.equals(deletedAt, topicPO.deletedAt);
+    return Objects.equal(topicId, topicPO.topicId)
+        && Objects.equal(topicName, topicPO.topicName)
+        && Objects.equal(metalakeId, topicPO.metalakeId)
+        && Objects.equal(catalogId, topicPO.catalogId)
+        && Objects.equal(schemaId, topicPO.schemaId)
+        && Objects.equal(comment, topicPO.comment)
+        && Objects.equal(properties, topicPO.properties)
+        && Objects.equal(auditInfo, topicPO.auditInfo)
+        && Objects.equal(currentVersion, topicPO.currentVersion)
+        && Objects.equal(lastVersion, topicPO.lastVersion)
+        && Objects.equal(deletedAt, topicPO.deletedAt);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(
+    return Objects.hashCode(
         topicId,
         topicName,
         metalakeId,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/MetadataObjectUtils.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
similarity index 89%
rename from 
core/src/main/java/org/apache/gravitino/storage/relational/utils/MetadataObjectUtils.java
rename to 
core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index dee5f143f..1fa5de878 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/MetadataObjectUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.storage.relational.utils;
+package org.apache.gravitino.storage.relational.service;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -31,24 +31,18 @@ import 
org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.gravitino.storage.relational.po.TopicPO;
-import org.apache.gravitino.storage.relational.service.CatalogMetaService;
-import org.apache.gravitino.storage.relational.service.FilesetMetaService;
-import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
-import org.apache.gravitino.storage.relational.service.SchemaMetaService;
-import org.apache.gravitino.storage.relational.service.TableMetaService;
-import org.apache.gravitino.storage.relational.service.TopicMetaService;
 
 /**
- * MetadataObjectUtils is used for converting full name to entity id and 
converting entity id to
+ * MetadataObjectService is used for converting full name to entity id and 
converting entity id to
  * full name.
  */
-public class MetadataObjectUtils {
+public class MetadataObjectService {
 
   private static final String DOT = ".";
   private static final Joiner DOT_JOINER = Joiner.on(DOT);
   private static final Splitter DOT_SPLITTER = Splitter.on(DOT);
 
-  private MetadataObjectUtils() {}
+  private MetadataObjectService() {}
 
   public static long getMetadataObjectId(
       long metalakeId, String fullName, MetadataObject.Type type) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
index baf3e94e5..cf8a5632a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
@@ -36,7 +36,6 @@ import 
org.apache.gravitino.storage.relational.mapper.UserRoleRelMapper;
 import org.apache.gravitino.storage.relational.po.RolePO;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
-import org.apache.gravitino.storage.relational.utils.MetadataObjectUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
 import org.slf4j.Logger;
@@ -108,7 +107,8 @@ public class RoleMetaService {
             POConverters.initializeSecurablePOBuilderWithVersion(
                 roleEntity.id(), object, getEntityType(object));
         objectBuilder.withEntityId(
-            MetadataObjectUtils.getMetadataObjectId(metalakeId, 
object.fullName(), object.type()));
+            MetadataObjectService.getMetadataObjectId(
+                metalakeId, object.fullName(), object.type()));
         securableObjectPOs.add(objectBuilder.build());
       }
 
@@ -154,7 +154,7 @@ public class RoleMetaService {
 
     for (SecurableObjectPO securableObjectPO : securableObjectPOs) {
       String fullName =
-          MetadataObjectUtils.getMetadataObjectFullName(
+          MetadataObjectService.getMetadataObjectFullName(
               securableObjectPO.getType(), securableObjectPO.getEntityId());
       if (fullName != null) {
         securableObjects.add(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
index 65b68e694..71b827527 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TagMetaService.java
@@ -19,23 +19,33 @@
 package org.apache.gravitino.storage.relational.service;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchTagException;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
+import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.TagPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.tag.TagManager;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 
 public class TagMetaService {
 
@@ -144,6 +154,188 @@ public class TagMetaService {
     return tagDeletedCount[0] + tagMetadataObjectRelDeletedCount[0] > 0;
   }
 
+  public List<TagEntity> listTagsForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType)
+      throws NoSuchTagException, IOException {
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(objectIdent, objectType);
+    String metalake = objectIdent.namespace().level(0);
+
+    List<TagPO> tagPOs = null;
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+      Long metadataObjectId =
+          MetadataObjectService.getMetadataObjectId(
+              metalakeId, metadataObject.fullName(), metadataObject.type());
+
+      tagPOs =
+          SessionUtils.doWithoutCommitAndFetchResult(
+              TagMetadataObjectRelMapper.class,
+              mapper ->
+                  mapper.listTagPOsByMetadataObjectIdAndType(
+                      metadataObjectId, metadataObject.type().toString()));
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.TAG, 
objectIdent.toString());
+      throw e;
+    }
+
+    return tagPOs.stream()
+        .map(tagPO -> POConverters.fromTagPO(tagPO, 
TagManager.ofTagNamespace(metalake)))
+        .collect(Collectors.toList());
+  }
+
+  public TagEntity getTagForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType, NameIdentifier 
tagIdent)
+      throws NoSuchEntityException, IOException {
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(objectIdent, objectType);
+    String metalake = objectIdent.namespace().level(0);
+
+    TagPO tagPO = null;
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+      Long metadataObjectId =
+          MetadataObjectService.getMetadataObjectId(
+              metalakeId, metadataObject.fullName(), metadataObject.type());
+
+      tagPO =
+          SessionUtils.getWithoutCommit(
+              TagMetadataObjectRelMapper.class,
+              mapper ->
+                  mapper.getTagPOsByMetadataObjectAndTagName(
+                      metadataObjectId, metadataObject.type().toString(), 
tagIdent.name()));
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.TAG, 
tagIdent.toString());
+      throw e;
+    }
+
+    if (tagPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.TAG.name().toLowerCase(),
+          tagIdent.name());
+    }
+
+    return POConverters.fromTagPO(tagPO, TagManager.ofTagNamespace(metalake));
+  }
+
+  public List<MetadataObject> 
listAssociatedMetadataObjectsForTag(NameIdentifier tagIdent)
+      throws IOException {
+    String metalakeName = tagIdent.namespace().level(0);
+    String tagName = tagIdent.name();
+
+    try {
+      List<TagMetadataObjectRelPO> tagMetadataObjectRelPOs =
+          SessionUtils.doWithCommitAndFetchResult(
+              TagMetadataObjectRelMapper.class,
+              mapper ->
+                  
mapper.listTagMetadataObjectRelsByMetalakeAndTagName(metalakeName, tagName));
+
+      List<MetadataObject> metadataObjects = Lists.newArrayList();
+      for (TagMetadataObjectRelPO po : tagMetadataObjectRelPOs) {
+        String fullName =
+            MetadataObjectService.getMetadataObjectFullName(
+                po.getMetadataObjectType(), po.getMetadataObjectId());
+
+        // Metadata object may be deleted asynchronously when we query the 
name, so it will return
+        // null. We should skip this metadata object.
+        if (fullName == null) {
+          continue;
+        }
+
+        MetadataObject.Type type = 
MetadataObject.Type.valueOf(po.getMetadataObjectType());
+        metadataObjects.add(MetadataObjects.parse(fullName, type));
+      }
+
+      return metadataObjects;
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.TAG, 
tagIdent.toString());
+      throw e;
+    }
+  }
+
+  public List<TagEntity> associateTagsWithMetadataObject(
+      NameIdentifier objectIdent,
+      Entity.EntityType objectType,
+      NameIdentifier[] tagsToAdd,
+      NameIdentifier[] tagsToRemove)
+      throws NoSuchEntityException, EntityAlreadyExistsException, IOException {
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(objectIdent, objectType);
+    String metalake = objectIdent.namespace().level(0);
+
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+      Long metadataObjectId =
+          MetadataObjectService.getMetadataObjectId(
+              metalakeId, metadataObject.fullName(), metadataObject.type());
+
+      // Fetch all the tags need to associate with the metadata object.
+      List<String> tagNamesToAdd =
+          
Arrays.stream(tagsToAdd).map(NameIdentifier::name).collect(Collectors.toList());
+      List<TagPO> tagPOsToAdd =
+          tagNamesToAdd.isEmpty()
+              ? Collections.emptyList()
+              : getTagPOsByMetalakeAndNames(metalake, tagNamesToAdd);
+
+      // Fetch all the tags need to remove from the metadata object.
+      List<String> tagNamesToRemove =
+          
Arrays.stream(tagsToRemove).map(NameIdentifier::name).collect(Collectors.toList());
+      List<TagPO> tagPOsToRemove =
+          tagNamesToRemove.isEmpty()
+              ? Collections.emptyList()
+              : getTagPOsByMetalakeAndNames(metalake, tagNamesToRemove);
+
+      SessionUtils.doMultipleWithCommit(
+          () -> {
+            // Insert the tag metadata object relations.
+            if (tagPOsToAdd.isEmpty()) {
+              return;
+            }
+
+            List<TagMetadataObjectRelPO> tagRelsToAdd =
+                tagPOsToAdd.stream()
+                    .map(
+                        tagPO ->
+                            
POConverters.initializeTagMetadataObjectRelPOWithVersion(
+                                tagPO.getTagId(),
+                                metadataObjectId,
+                                metadataObject.type().toString()))
+                    .collect(Collectors.toList());
+            SessionUtils.doWithoutCommit(
+                TagMetadataObjectRelMapper.class,
+                mapper -> 
mapper.batchInsertTagMetadataObjectRels(tagRelsToAdd));
+          },
+          () -> {
+            // Remove the tag metadata object relations.
+            if (tagPOsToRemove.isEmpty()) {
+              return;
+            }
+
+            List<Long> tagIdsToRemove =
+                
tagPOsToRemove.stream().map(TagPO::getTagId).collect(Collectors.toList());
+            SessionUtils.doWithoutCommit(
+                TagMetadataObjectRelMapper.class,
+                mapper ->
+                    
mapper.batchDeleteTagMetadataObjectRelsByTagIdsAndMetadataObject(
+                        metadataObjectId, metadataObject.type().toString(), 
tagIdsToRemove));
+          });
+
+      // Fetch all the tags associated with the metadata object after the 
operation.
+      List<TagPO> tagPOs =
+          SessionUtils.doWithoutCommitAndFetchResult(
+              TagMetadataObjectRelMapper.class,
+              mapper ->
+                  mapper.listTagPOsByMetadataObjectIdAndType(
+                      metadataObjectId, metadataObject.type().toString()));
+
+      return tagPOs.stream()
+          .map(tagPO -> POConverters.fromTagPO(tagPO, 
TagManager.ofTagNamespace(metalake)))
+          .collect(Collectors.toList());
+
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.TAG, 
objectIdent.toString());
+      throw e;
+    }
+  }
+
   public int deleteTagMetasByLegacyTimeline(long legacyTimeline, int limit) {
     int[] tagDeletedCount = new int[] {0};
     int[] tagMetadataObjectRelDeletedCount = new int[] {0};
@@ -177,4 +369,10 @@ public class TagMetaService {
     }
     return tagPO;
   }
+
+  private List<TagPO> getTagPOsByMetalakeAndNames(String metalakeName, 
List<String> tagNames) {
+    return SessionUtils.getWithoutCommit(
+        TagMetaMapper.class,
+        mapper -> mapper.listTagPOsByMetalakeAndTagNames(metalakeName, 
tagNames));
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 7f0aa1983..490ad6112 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.utils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -56,10 +57,12 @@ import org.apache.gravitino.storage.relational.po.RolePO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.TagPO;
 import org.apache.gravitino.storage.relational.po.TopicPO;
 import org.apache.gravitino.storage.relational.po.UserPO;
 import org.apache.gravitino.storage.relational.po.UserRoleRelPO;
+import org.apache.gravitino.utils.PrincipalUtils;
 
 /** POConverters is a utility class to convert PO to Base and vice versa. */
 public class POConverters {
@@ -1015,4 +1018,27 @@ public class POConverters {
       throw new RuntimeException("Failed to serialize json object:", e);
     }
   }
+
+  public static TagMetadataObjectRelPO 
initializeTagMetadataObjectRelPOWithVersion(
+      Long tagId, Long metadataObjectId, String metadataObjectType) {
+    try {
+      AuditInfo auditInfo =
+          AuditInfo.builder()
+              .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+              .withCreateTime(Instant.now())
+              .build();
+
+      return TagMetadataObjectRelPO.builder()
+          .withTagId(tagId)
+          .withMetadataObjectId(metadataObjectId)
+          .withMetadataObjectType(metadataObjectType)
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+          .withCurrentVersion(INIT_VERSION)
+          .withLastVersion(INIT_VERSION)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/tag/SupportsTagOperations.java 
b/core/src/main/java/org/apache/gravitino/tag/SupportsTagOperations.java
new file mode 100644
index 000000000..fd635a024
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/tag/SupportsTagOperations.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.tag;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.TagEntity;
+
+/**
+ * An interface to support extra tag operations, this interface should be 
mixed with {@link
+ * EntityStore} to provide extra operations.
+ *
+ * <p>Any operations that can be done by the entity store should be added here.
+ */
+public interface SupportsTagOperations {
+
+  /**
+   * List all the metadata objects that are associated with the given tag.
+   *
+   * @param tagIdent The identifier of the tag.
+   * @return The list of metadata objects associated with the given tag.
+   * @throws IOException If an error occurs while accessing the entity store.
+   */
+  List<MetadataObject> listAssociatedMetadataObjectsForTag(NameIdentifier 
tagIdent)
+      throws IOException;
+
+  /**
+   * List all the tags that are associated with the given metadata object.
+   *
+   * @param objectIdent The identifier of the metadata object.
+   * @param objectType The type of the metadata object.
+   * @return The list of tags associated with the given metadata object.
+   * @throws NoSuchEntityException if the metadata object does not exist.
+   * @throws IOException If an error occurs while accessing the entity store.
+   */
+  List<TagEntity> listAssociatedTagsForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType)
+      throws NoSuchEntityException, IOException;
+
+  /**
+   * Get the tag with the given identifier that is associated with the given 
metadata object.
+   *
+   * @param objectIdent The identifier of the metadata object.
+   * @param objectType The type of the metadata object.
+   * @param tagIdent The identifier of the tag.
+   * @return The tag associated with the metadata object.
+   * @throws NoSuchEntityException if the metadata object does not exist or 
the tag is not
+   *     associated to the metadata object.
+   * @throws IOException If an error occurs while accessing the entity store.
+   */
+  TagEntity getTagForMetadataObject(
+      NameIdentifier objectIdent, Entity.EntityType objectType, NameIdentifier 
tagIdent)
+      throws NoSuchEntityException, IOException;
+
+  /**
+   * Associate the given tags with the given metadata object.
+   *
+   * @param objectIdent The identifier of the metadata object.
+   * @param objectType The type of the metadata object.
+   * @param tagsToAdd The name of tags to associate with the metadata object.
+   * @param tagsToRemove the name of tags to remove from the metadata object.
+   * @return The list of tags associated with the metadata object after the 
operation.
+   * @throws NoSuchEntityException if the metadata object does not exist.
+   * @throws EntityAlreadyExistsException if tags already associated with the 
metadata object.
+   * @throws IOException If an error occurs while accessing the entity store.
+   */
+  List<TagEntity> associateTagsWithMetadataObject(
+      NameIdentifier objectIdent,
+      Entity.EntityType objectType,
+      NameIdentifier[] tagsToAdd,
+      NameIdentifier[] tagsToRemove)
+      throws NoSuchEntityException, EntityAlreadyExistsException, IOException;
+}
diff --git a/core/src/main/java/org/apache/gravitino/tag/TagManager.java 
b/core/src/main/java/org/apache/gravitino/tag/TagManager.java
index 392d70672..1dc65e2d8 100644
--- a/core/src/main/java/org/apache/gravitino/tag/TagManager.java
+++ b/core/src/main/java/org/apache/gravitino/tag/TagManager.java
@@ -19,20 +19,27 @@
 package org.apache.gravitino.tag;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchTagException;
+import org.apache.gravitino.exceptions.NotFoundException;
+import org.apache.gravitino.exceptions.TagAlreadyAssociatedException;
 import org.apache.gravitino.exceptions.TagAlreadyExistsException;
 import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
@@ -40,6 +47,7 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.kv.KvEntityStore;
+import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +60,8 @@ public class TagManager {
 
   private final EntityStore entityStore;
 
+  private final SupportsTagOperations supportsTagOperations;
+
   public TagManager(IdGenerator idGenerator, EntityStore entityStore) {
     if (entityStore instanceof KvEntityStore) {
       String errorMsg =
@@ -61,6 +71,16 @@ public class TagManager {
       throw new RuntimeException(errorMsg);
     }
 
+    if (!(entityStore instanceof SupportsTagOperations)) {
+      String errorMsg =
+          "TagManager cannot run with entity store that does not support tag 
operations, "
+              + "please configure the entity store to use relational entity 
store and restart the Gravitino server";
+      LOG.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+
+    this.supportsTagOperations = entityStore.tagOperations();
+
     this.idGenerator = idGenerator;
     this.entityStore = entityStore;
   }
@@ -84,10 +104,6 @@ public class TagManager {
         });
   }
 
-  public MetadataObject[] listAssociatedMetadataObjectsForTag(String metalake, 
String name) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
   public Tag createTag(String metalake, String name, String comment, 
Map<String, String> properties)
       throws TagAlreadyExistsException {
     Map<String, String> tagProperties = properties == null ? 
Collections.emptyMap() : properties;
@@ -188,22 +204,162 @@ public class TagManager {
         });
   }
 
-  public String[] listTagsForMetadataObject(String metalake, MetadataObject 
metadataObject) {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public MetadataObject[] listMetadataObjectsForTag(String metalake, String 
name)
+      throws NoSuchTagException {
+    NameIdentifier tagId = ofTagIdent(metalake, name);
+    return TreeLockUtils.doWithTreeLock(
+        tagId,
+        LockType.READ,
+        () -> {
+          checkMetalakeExists(metalake, entityStore);
+
+          try {
+            if (!entityStore.exists(tagId, Entity.EntityType.TAG)) {
+              throw new NoSuchTagException(
+                  "Tag with name %s under metalake %s does not exist", name, 
metalake);
+            }
+
+            return supportsTagOperations
+                .listAssociatedMetadataObjectsForTag(tagId)
+                .toArray(new MetadataObject[0]);
+          } catch (IOException e) {
+            LOG.error("Failed to list metadata objects for tag {}", name, e);
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  public String[] listTagsForMetadataObject(String metalake, MetadataObject 
metadataObject)
+      throws NotFoundException {
+    return Arrays.stream(listTagsInfoForMetadataObject(metalake, 
metadataObject))
+        .map(Tag::name)
+        .toArray(String[]::new);
   }
 
-  public Tag[] listTagsInfoForMetadataObject(String metalake, MetadataObject 
metadataObject) {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public Tag[] listTagsInfoForMetadataObject(String metalake, MetadataObject 
metadataObject)
+      throws NotFoundException {
+    NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataObject);
+
+    if (!checkAndImportEntity(metalake, metadataObject, 
GravitinoEnv.getInstance())) {
+      throw new NotFoundException(
+          "Failed to list tags for metadata object %s due to not found", 
metadataObject);
+    }
+
+    return TreeLockUtils.doWithTreeLock(
+        entityIdent,
+        LockType.READ,
+        () -> {
+          try {
+            return supportsTagOperations
+                .listAssociatedTagsForMetadataObject(entityIdent, entityType)
+                .toArray(new Tag[0]);
+          } catch (NoSuchEntityException e) {
+            throw new NotFoundException(
+                e, "Failed to list tags for metadata object %s due to not 
found", metadataObject);
+          } catch (IOException e) {
+            LOG.error("Failed to list tags for metadata object {}", 
metadataObject, e);
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   public Tag getTagForMetadataObject(String metalake, MetadataObject 
metadataObject, String name)
-      throws NoSuchTagException {
-    throw new UnsupportedOperationException("Not implemented yet");
+      throws NotFoundException {
+    NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataObject);
+    NameIdentifier tagIdent = ofTagIdent(metalake, name);
+
+    if (!checkAndImportEntity(metalake, metadataObject, 
GravitinoEnv.getInstance())) {
+      throw new NotFoundException(
+          "Failed to get tag for metadata object %s due to not found", 
metadataObject);
+    }
+
+    return TreeLockUtils.doWithTreeLock(
+        entityIdent,
+        LockType.READ,
+        () -> {
+          try {
+            return supportsTagOperations.getTagForMetadataObject(entityIdent, 
entityType, tagIdent);
+          } catch (NoSuchEntityException e) {
+            if (e.getMessage().contains("No such tag entity")) {
+              throw new NoSuchTagException(
+                  e, "Tag %s does not exist for metadata object %s", name, 
metadataObject);
+            } else {
+              throw new NotFoundException(
+                  e, "Failed to get tag for metadata object %s due to not 
found", metadataObject);
+            }
+          } catch (IOException e) {
+            LOG.error("Failed to get tag for metadata object {}", 
metadataObject, e);
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   public String[] associateTagsForMetadataObject(
-      String metalake, MetadataObject metadataObject, String[] tagsToAdd, 
String[] tagsToRemove) {
-    throw new UnsupportedOperationException("Not implemented yet");
+      String metalake, MetadataObject metadataObject, String[] tagsToAdd, 
String[] tagsToRemove)
+      throws NotFoundException, TagAlreadyAssociatedException {
+    Preconditions.checkArgument(
+        !metadataObject.type().equals(MetadataObject.Type.METALAKE)
+            && !metadataObject.type().equals(MetadataObject.Type.COLUMN),
+        "Cannot associate tags for unsupported metadata object type %s",
+        metadataObject.type());
+
+    NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataObject);
+
+    if (!checkAndImportEntity(metalake, metadataObject, 
GravitinoEnv.getInstance())) {
+      throw new NotFoundException(
+          "Failed to associate tags for metadata object %s due to not found", 
metadataObject);
+    }
+
+    // Remove all the tags that are both set to add and remove
+    Set<String> tagsToAddSet = tagsToAdd == null ? Sets.newHashSet() : 
Sets.newHashSet(tagsToAdd);
+    Set<String> tagsToRemoveSet =
+        tagsToRemove == null ? Sets.newHashSet() : 
Sets.newHashSet(tagsToRemove);
+    Set<String> common = Sets.intersection(tagsToAddSet, 
tagsToRemoveSet).immutableCopy();
+    tagsToAddSet.removeAll(common);
+    tagsToRemoveSet.removeAll(common);
+
+    NameIdentifier[] tagsToAddIdent =
+        tagsToAddSet.stream().map(tag -> ofTagIdent(metalake, 
tag)).toArray(NameIdentifier[]::new);
+    NameIdentifier[] tagsToRemoveIdent =
+        tagsToRemoveSet.stream()
+            .map(tag -> ofTagIdent(metalake, tag))
+            .toArray(NameIdentifier[]::new);
+
+    return TreeLockUtils.doWithTreeLock(
+        entityIdent,
+        LockType.READ,
+        () ->
+            TreeLockUtils.doWithTreeLock(
+                NameIdentifier.of(ofTagNamespace(metalake).levels()),
+                LockType.WRITE,
+                () -> {
+                  try {
+                    return supportsTagOperations
+                        .associateTagsWithMetadataObject(
+                            entityIdent, entityType, tagsToAddIdent, 
tagsToRemoveIdent)
+                        .stream()
+                        .map(Tag::name)
+                        .toArray(String[]::new);
+                  } catch (NoSuchEntityException e) {
+                    throw new NotFoundException(
+                        e,
+                        "Failed to associate tags for metadata object %s due 
to not found",
+                        metadataObject);
+                  } catch (EntityAlreadyExistsException e) {
+                    throw new TagAlreadyAssociatedException(
+                        e,
+                        "Failed to associate tags for metadata object due to 
some tags %s already "
+                            + "associated to the metadata object %s",
+                        Arrays.toString(tagsToAdd),
+                        metadataObject);
+                  } catch (IOException e) {
+                    LOG.error("Failed to associate tags for metadata object 
{}", metadataObject, e);
+                    throw new RuntimeException(e);
+                  }
+                }));
   }
 
   private static void checkMetalakeExists(String metalake, EntityStore 
entityStore) {
@@ -219,7 +375,6 @@ public class TagManager {
     }
   }
 
-  @VisibleForTesting
   public static Namespace ofTagNamespace(String metalake) {
     return Namespace.of(metalake, Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.TAG_SCHEMA_NAME);
   }
@@ -267,4 +422,33 @@ public class TagManager {
                 .build())
         .build();
   }
+
+  // This method will check if the entity is existed explicitly, internally 
this check will load
+  // the entity from underlying sources to entity store if not stored, and 
will allocate an uid
+  // for this entity, with this uid tags can be associated with this entity.
+  // This method should be called out of the tree lock, otherwise it will 
cause deadlock.
+  @VisibleForTesting
+  boolean checkAndImportEntity(String metalake, MetadataObject metadataObject, 
GravitinoEnv env) {
+    NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataObject);
+
+    switch (entityType) {
+      case METALAKE:
+        return env.metalakeDispatcher().metalakeExists(entityIdent);
+      case CATALOG:
+        return env.catalogDispatcher().catalogExists(entityIdent);
+      case SCHEMA:
+        return env.schemaDispatcher().schemaExists(entityIdent);
+      case TABLE:
+        return env.tableDispatcher().tableExists(entityIdent);
+      case TOPIC:
+        return env.topicDispatcher().topicExists(entityIdent);
+      case FILESET:
+        return env.filesetDispatcher().filesetExists(entityIdent);
+      case COLUMN:
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported metadata object type: " + metadataObject.type());
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
new file mode 100644
index 000000000..005456858
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+
+public class MetadataObjectUtil {
+
+  private static final Joiner DOT = Joiner.on(".");
+
+  private static final BiMap<MetadataObject.Type, Entity.EntityType> 
TYPE_TO_TYPE_MAP =
+      ImmutableBiMap.<MetadataObject.Type, Entity.EntityType>builder()
+          .put(MetadataObject.Type.METALAKE, Entity.EntityType.METALAKE)
+          .put(MetadataObject.Type.CATALOG, Entity.EntityType.CATALOG)
+          .put(MetadataObject.Type.SCHEMA, Entity.EntityType.SCHEMA)
+          .put(MetadataObject.Type.TABLE, Entity.EntityType.TABLE)
+          .put(MetadataObject.Type.TOPIC, Entity.EntityType.TOPIC)
+          .put(MetadataObject.Type.FILESET, Entity.EntityType.FILESET)
+          .put(MetadataObject.Type.COLUMN, Entity.EntityType.COLUMN)
+          .build();
+
+  private MetadataObjectUtil() {}
+
+  /**
+   * Map the given {@link MetadataObject}'s type to the corresponding {@link 
Entity.EntityType}.
+   *
+   * @param metadataObject The metadata object
+   * @return The entity type
+   * @throws IllegalArgumentException if the metadata object type is unknown
+   */
+  public static Entity.EntityType toEntityType(MetadataObject metadataObject) {
+    Preconditions.checkArgument(metadataObject != null, "metadataObject cannot 
be null");
+
+    return Optional.ofNullable(TYPE_TO_TYPE_MAP.get(metadataObject.type()))
+        .orElseThrow(
+            () ->
+                new IllegalArgumentException(
+                    "Unknown metadata object type: " + metadataObject.type()));
+  }
+
+  /**
+   * Convert the given {@link MetadataObject} full name to the corresponding 
{@link NameIdentifier}.
+   *
+   * @param metalakeName The metalake name
+   * @param metadataObject The metadata object
+   * @return The entity identifier
+   * @throws IllegalArgumentException if the metadata object type is 
unsupported or unknown.
+   */
+  public static NameIdentifier toEntityIdent(String metalakeName, 
MetadataObject metadataObject) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(metalakeName), "metalakeName cannot be blank");
+    Preconditions.checkArgument(metadataObject != null, "metadataObject cannot 
be null");
+
+    switch (metadataObject.type()) {
+      case METALAKE:
+        return NameIdentifierUtil.ofMetalake(metalakeName);
+      case CATALOG:
+      case SCHEMA:
+      case TABLE:
+      case TOPIC:
+      case FILESET:
+        String fullName = DOT.join(metalakeName, metadataObject.fullName());
+        return NameIdentifier.parse(fullName);
+      case COLUMN:
+        throw new IllegalArgumentException(
+            "Cannot convert column metadata object to entity identifier: "
+                + metadataObject.fullName());
+      default:
+        throw new IllegalArgumentException(
+            "Unknown metadata object type: " + metadataObject.type());
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index fcdaf43e0..616deb235 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -18,8 +18,13 @@
  */
 package org.apache.gravitino.utils;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
 import org.apache.gravitino.exceptions.IllegalNamespaceException;
@@ -193,4 +198,54 @@ public class NameIdentifierUtil {
       throw new IllegalNamespaceException(message, args);
     }
   }
+
+  /**
+   * Convert the given {@link NameIdentifier} and {@link Entity.EntityType} to 
{@link
+   * MetadataObject}.
+   *
+   * @param ident The identifier
+   * @param entityType The entity type
+   * @return The converted {@link MetadataObject}
+   */
+  public static MetadataObject toMetadataObject(
+      NameIdentifier ident, Entity.EntityType entityType) {
+    Preconditions.checkArgument(
+        ident != null && entityType != null, "The identifier and entity type 
must not be null");
+
+    Joiner dot = Joiner.on(".");
+
+    switch (entityType) {
+      case METALAKE:
+        checkMetalake(ident);
+        return MetadataObjects.of(null, ident.name(), 
MetadataObject.Type.METALAKE);
+
+      case CATALOG:
+        checkCatalog(ident);
+        return MetadataObjects.of(null, ident.name(), 
MetadataObject.Type.CATALOG);
+
+      case SCHEMA:
+        checkSchema(ident);
+        String schemaParent = ident.namespace().level(1);
+        return MetadataObjects.of(schemaParent, ident.name(), 
MetadataObject.Type.SCHEMA);
+
+      case TABLE:
+        checkTable(ident);
+        String tableParent = dot.join(ident.namespace().level(1), 
ident.namespace().level(2));
+        return MetadataObjects.of(tableParent, ident.name(), 
MetadataObject.Type.TABLE);
+
+      case FILESET:
+        checkFileset(ident);
+        String filesetParent = dot.join(ident.namespace().level(1), 
ident.namespace().level(2));
+        return MetadataObjects.of(filesetParent, ident.name(), 
MetadataObject.Type.FILESET);
+
+      case TOPIC:
+        checkTopic(ident);
+        String topicParent = dot.join(ident.namespace().level(1), 
ident.namespace().level(2));
+        return MetadataObjects.of(topicParent, ident.name(), 
MetadataObject.Type.TOPIC);
+
+      default:
+        throw new IllegalArgumentException(
+            "Entity type " + entityType + " is not supported to convert to 
MetadataObject");
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTagMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTagMetaService.java
index 06aec2335..5bb76ad9c 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTagMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTagMetaService.java
@@ -23,9 +23,18 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.TestJDBCBackend;
@@ -297,4 +306,356 @@ public class TestTagMetaService extends TestJDBCBackend {
         NoSuchEntityException.class,
         () -> 
tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName + "1", 
"tag2")));
   }
+
+  @Test
+  public void testAssociateAndDisassociateTagsWithMetadataObject() throws 
IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of(metalakeName), 
"catalog1", auditInfo);
+    backend.insert(catalog, false);
+
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name()),
+            "schema1",
+            auditInfo);
+    backend.insert(schema, false);
+
+    TableEntity table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name(), schema.name()),
+            "table1",
+            auditInfo);
+    backend.insert(table, false);
+
+    // Create tags to associate
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+    TagEntity tagEntity1 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity1, false);
+
+    TagEntity tagEntity2 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag2")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity2, false);
+
+    TagEntity tagEntity3 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag3")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity3, false);
+
+    // Test associate tags with metadata object
+    NameIdentifier[] tagsToAdd =
+        new NameIdentifier[] {
+          TagManager.ofTagIdent(metalakeName, "tag1"),
+          TagManager.ofTagIdent(metalakeName, "tag2"),
+          TagManager.ofTagIdent(metalakeName, "tag3")
+        };
+
+    List<TagEntity> tagEntities =
+        tagMetaService.associateTagsWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), tagsToAdd, new 
NameIdentifier[0]);
+    Assertions.assertEquals(3, tagEntities.size());
+    Assertions.assertTrue(tagEntities.contains(tagEntity1));
+    Assertions.assertTrue(tagEntities.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities.contains(tagEntity3));
+
+    // Test disassociate tags with metadata object
+    NameIdentifier[] tagsToRemove =
+        new NameIdentifier[] {TagManager.ofTagIdent(metalakeName, "tag1")};
+
+    List<TagEntity> tagEntities1 =
+        tagMetaService.associateTagsWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), new NameIdentifier[0], 
tagsToRemove);
+
+    Assertions.assertEquals(2, tagEntities1.size());
+    Assertions.assertFalse(tagEntities1.contains(tagEntity1));
+    Assertions.assertTrue(tagEntities1.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities1.contains(tagEntity3));
+
+    // Test no tags to associate and disassociate
+    List<TagEntity> tagEntities2 =
+        tagMetaService.associateTagsWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), new NameIdentifier[0], 
new NameIdentifier[0]);
+    Assertions.assertEquals(2, tagEntities2.size());
+    Assertions.assertFalse(tagEntities2.contains(tagEntity1));
+    Assertions.assertTrue(tagEntities2.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities2.contains(tagEntity3));
+
+    // Test associate and disassociate same tags with metadata object
+    List<TagEntity> tagEntities3 =
+        tagMetaService.associateTagsWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), tagsToRemove, 
tagsToRemove);
+
+    Assertions.assertEquals(2, tagEntities3.size());
+    Assertions.assertFalse(tagEntities3.contains(tagEntity1));
+    Assertions.assertTrue(tagEntities3.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities3.contains(tagEntity3));
+
+    // Test associate and disassociate in-existent tags with metadata object
+    NameIdentifier[] tagsToAdd1 =
+        new NameIdentifier[] {
+          TagManager.ofTagIdent(metalakeName, "tag4"), 
TagManager.ofTagIdent(metalakeName, "tag5")
+        };
+
+    NameIdentifier[] tagsToRemove1 =
+        new NameIdentifier[] {
+          TagManager.ofTagIdent(metalakeName, "tag6"), 
TagManager.ofTagIdent(metalakeName, "tag7")
+        };
+
+    List<TagEntity> tagEntities4 =
+        tagMetaService.associateTagsWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), tagsToAdd1, 
tagsToRemove1);
+
+    Assertions.assertEquals(2, tagEntities4.size());
+    Assertions.assertTrue(tagEntities4.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities4.contains(tagEntity3));
+
+    // Test associate already associated tags with metadata object
+    Assertions.assertThrows(
+        EntityAlreadyExistsException.class,
+        () ->
+            tagMetaService.associateTagsWithMetadataObject(
+                catalog.nameIdentifier(), catalog.type(), tagsToAdd, new 
NameIdentifier[0]));
+
+    // Test disassociate already disassociated tags with metadata object
+    List<TagEntity> tagEntities5 =
+        tagMetaService.associateTagsWithMetadataObject(
+            catalog.nameIdentifier(), catalog.type(), new NameIdentifier[0], 
tagsToRemove);
+
+    Assertions.assertEquals(2, tagEntities5.size());
+    Assertions.assertTrue(tagEntities5.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities5.contains(tagEntity3));
+
+    // Test associate and disassociate with invalid metadata object
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            tagMetaService.associateTagsWithMetadataObject(
+                NameIdentifier.of(metalakeName, "non-existent-catalog"),
+                catalog.type(),
+                tagsToAdd,
+                tagsToRemove));
+
+    // Test associate and disassociate to a schema
+    List<TagEntity> tagEntities6 =
+        tagMetaService.associateTagsWithMetadataObject(
+            schema.nameIdentifier(), schema.type(), tagsToAdd, tagsToRemove);
+
+    Assertions.assertEquals(2, tagEntities6.size());
+    Assertions.assertTrue(tagEntities6.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities6.contains(tagEntity3));
+
+    // Test associate and disassociate to a table
+    List<TagEntity> tagEntities7 =
+        tagMetaService.associateTagsWithMetadataObject(
+            table.nameIdentifier(), table.type(), tagsToAdd, tagsToRemove);
+
+    Assertions.assertEquals(2, tagEntities7.size());
+    Assertions.assertTrue(tagEntities7.contains(tagEntity2));
+    Assertions.assertTrue(tagEntities7.contains(tagEntity3));
+  }
+
+  @Test
+  public void testListTagsForMetadataObject() throws IOException {
+    testAssociateAndDisassociateTagsWithMetadataObject();
+
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+
+    // Test list tags for catalog
+    List<TagEntity> tagEntities =
+        tagMetaService.listTagsForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1"), 
Entity.EntityType.CATALOG);
+    Assertions.assertEquals(2, tagEntities.size());
+    Assertions.assertTrue(
+        tagEntities.stream().anyMatch(tagEntity -> 
tagEntity.name().equals("tag2")));
+    Assertions.assertTrue(
+        tagEntities.stream().anyMatch(tagEntity -> 
tagEntity.name().equals("tag3")));
+
+    // Test list tags for schema
+    List<TagEntity> tagEntities1 =
+        tagMetaService.listTagsForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1"), 
Entity.EntityType.SCHEMA);
+
+    Assertions.assertEquals(2, tagEntities1.size());
+    Assertions.assertTrue(
+        tagEntities1.stream().anyMatch(tagEntity -> 
tagEntity.name().equals("tag2")));
+    Assertions.assertTrue(
+        tagEntities1.stream().anyMatch(tagEntity -> 
tagEntity.name().equals("tag3")));
+
+    // Test list tags for table
+    List<TagEntity> tagEntities2 =
+        tagMetaService.listTagsForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1", "table1"),
+            Entity.EntityType.TABLE);
+
+    Assertions.assertEquals(2, tagEntities2.size());
+    Assertions.assertTrue(
+        tagEntities2.stream().anyMatch(tagEntity -> 
tagEntity.name().equals("tag2")));
+    Assertions.assertTrue(
+        tagEntities2.stream().anyMatch(tagEntity -> 
tagEntity.name().equals("tag3")));
+
+    // Test list tags for non-existent metadata object
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            tagMetaService.listTagsForMetadataObject(
+                NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"table2"),
+                Entity.EntityType.TABLE));
+  }
+
+  @Test
+  public void testGetTagForMetadataObject() throws IOException {
+    testAssociateAndDisassociateTagsWithMetadataObject();
+
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+
+    // Test get tag for catalog
+    TagEntity tagEntity =
+        tagMetaService.getTagForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1"),
+            Entity.EntityType.CATALOG,
+            TagManager.ofTagIdent(metalakeName, "tag2"));
+    Assertions.assertEquals("tag2", tagEntity.name());
+
+    // Test get tag for schema
+    TagEntity tagEntity1 =
+        tagMetaService.getTagForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1"),
+            Entity.EntityType.SCHEMA,
+            TagManager.ofTagIdent(metalakeName, "tag3"));
+    Assertions.assertEquals("tag3", tagEntity1.name());
+
+    // Test get tag for table
+    TagEntity tagEntity2 =
+        tagMetaService.getTagForMetadataObject(
+            NameIdentifier.of(metalakeName, "catalog1", "schema1", "table1"),
+            Entity.EntityType.TABLE,
+            TagManager.ofTagIdent(metalakeName, "tag2"));
+    Assertions.assertEquals("tag2", tagEntity2.name());
+
+    // Test get tag for non-existent metadata object
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            tagMetaService.getTagForMetadataObject(
+                NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"table2"),
+                Entity.EntityType.TABLE,
+                TagManager.ofTagIdent(metalakeName, "tag2")));
+
+    // Test get tag for non-existent tag
+    Throwable e =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () ->
+                tagMetaService.getTagForMetadataObject(
+                    NameIdentifier.of(metalakeName, "catalog1", "schema1", 
"table1"),
+                    Entity.EntityType.TABLE,
+                    TagManager.ofTagIdent(metalakeName, "tag4")));
+    Assertions.assertTrue(e.getMessage().contains("No such tag entity: tag4"));
+  }
+
+  @Test
+  public void testListAssociatedMetadataObjectsForTag() throws IOException {
+    testAssociateAndDisassociateTagsWithMetadataObject();
+
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+
+    // Test list associated metadata objects for tag2
+    List<MetadataObject> metadataObjects =
+        tagMetaService.listAssociatedMetadataObjectsForTag(
+            TagManager.ofTagIdent(metalakeName, "tag2"));
+
+    Assertions.assertEquals(3, metadataObjects.size());
+    Assertions.assertTrue(
+        metadataObjects.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+    Assertions.assertTrue(
+        metadataObjects.contains(
+            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
+    Assertions.assertTrue(
+        metadataObjects.contains(
+            MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE)));
+
+    // Test list associated metadata objects for tag3
+    List<MetadataObject> metadataObjects1 =
+        tagMetaService.listAssociatedMetadataObjectsForTag(
+            TagManager.ofTagIdent(metalakeName, "tag3"));
+
+    Assertions.assertEquals(3, metadataObjects1.size());
+    Assertions.assertTrue(
+        metadataObjects1.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+    Assertions.assertTrue(
+        metadataObjects1.contains(
+            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
+    Assertions.assertTrue(
+        metadataObjects1.contains(
+            MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE)));
+
+    // Test list associated metadata objects for non-existent tag
+    List<MetadataObject> metadataObjects2 =
+        tagMetaService.listAssociatedMetadataObjectsForTag(
+            TagManager.ofTagIdent(metalakeName, "tag4"));
+    Assertions.assertEquals(0, metadataObjects2.size());
+
+    // Test metadata object non-exist scenario.
+    backend.delete(
+        NameIdentifier.of(metalakeName, "catalog1", "schema1", "table1"),
+        Entity.EntityType.TABLE,
+        false);
+
+    List<MetadataObject> metadataObjects3 =
+        tagMetaService.listAssociatedMetadataObjectsForTag(
+            TagManager.ofTagIdent(metalakeName, "tag2"));
+
+    Assertions.assertEquals(2, metadataObjects3.size());
+    Assertions.assertTrue(
+        metadataObjects3.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+    Assertions.assertTrue(
+        metadataObjects3.contains(
+            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
+
+    backend.delete(
+        NameIdentifier.of(metalakeName, "catalog1", "schema1"), 
Entity.EntityType.SCHEMA, false);
+
+    List<MetadataObject> metadataObjects4 =
+        tagMetaService.listAssociatedMetadataObjectsForTag(
+            TagManager.ofTagIdent(metalakeName, "tag2"));
+
+    Assertions.assertEquals(1, metadataObjects4.size());
+    Assertions.assertTrue(
+        metadataObjects4.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+
+    backend.delete(NameIdentifier.of(metalakeName, "catalog1"), 
Entity.EntityType.CATALOG, false);
+
+    List<MetadataObject> metadataObjects5 =
+        tagMetaService.listAssociatedMetadataObjectsForTag(
+            TagManager.ofTagIdent(metalakeName, "tag2"));
+
+    Assertions.assertEquals(0, metadataObjects5.size());
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index a7b5e0855..ccf445c81 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.json.JsonUtils;
@@ -52,6 +53,7 @@ import 
org.apache.gravitino.storage.relational.po.FilesetVersionPO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.TagPO;
 import org.apache.gravitino.storage.relational.po.TopicPO;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -654,6 +656,21 @@ public class TestPOConverters {
     assertEquals("this is test2", updatePO.getComment());
   }
 
+  @Test
+  public void testTagMetadataObjectRelPO() {
+    TagMetadataObjectRelPO tagMetadataObjectRelPO =
+        POConverters.initializeTagMetadataObjectRelPOWithVersion(
+            1L, 1L, MetadataObject.Type.CATALOG.toString());
+    assertEquals(1L, tagMetadataObjectRelPO.getTagId());
+    assertEquals(1L, tagMetadataObjectRelPO.getMetadataObjectId());
+    assertEquals(
+        MetadataObject.Type.CATALOG.toString(), 
tagMetadataObjectRelPO.getMetadataObjectType());
+
+    assertEquals(1, tagMetadataObjectRelPO.getCurrentVersion());
+    assertEquals(1, tagMetadataObjectRelPO.getLastVersion());
+    assertEquals(0, tagMetadataObjectRelPO.getDeletedAt());
+  }
+
   private static BaseMetalake createMetalake(Long id, String name, String 
comment) {
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
diff --git a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java 
b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
index efd78a77a..3dc298cdb 100644
--- a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
+++ b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.tag;
 
+import static org.apache.gravitino.Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS;
 import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
 import static 
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
 import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL;
@@ -30,6 +31,8 @@ import static 
org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -43,19 +46,29 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchTagException;
+import org.apache.gravitino.exceptions.NotFoundException;
+import org.apache.gravitino.exceptions.TagAlreadyAssociatedException;
 import org.apache.gravitino.exceptions.TagAlreadyExistsException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -73,6 +86,12 @@ public class TestTagManager {
 
   private static final String METALAKE = "metalake_for_tag_test";
 
+  private static final String CATALOG = "catalog_for_tag_test";
+
+  private static final String SCHEMA = "schema_for_tag_test";
+
+  private static final String TABLE = "table_for_tag_test";
+
   private static EntityStore entityStore;
 
   private static IdGenerator idGenerator;
@@ -94,6 +113,7 @@ public class TestTagManager {
     
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
     Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 
1000L);
     Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
+    
Mockito.when(config.get(CATALOG_CACHE_EVICTION_INTERVAL_MS)).thenReturn(1000L);
 
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
@@ -115,7 +135,41 @@ public class TestTagManager {
             .build();
     entityStore.put(metalake, false /* overwritten */);
 
-    tagManager = new TagManager(idGenerator, entityStore);
+    CatalogEntity catalog =
+        CatalogEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(CATALOG)
+            .withNamespace(Namespace.of(METALAKE))
+            .withType(Catalog.Type.RELATIONAL)
+            .withProvider("test")
+            .withComment("Test catalog")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(catalog, false /* overwritten */);
+
+    SchemaEntity schema =
+        SchemaEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(SCHEMA)
+            .withNamespace(Namespace.of(METALAKE, CATALOG))
+            .withComment("Test schema")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(schema, false /* overwritten */);
+
+    TableEntity table =
+        TableEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(TABLE)
+            .withNamespace(Namespace.of(METALAKE, CATALOG, SCHEMA))
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(table, false /* overwritten */);
+
+    tagManager = spy(new TagManager(idGenerator, entityStore));
+    doReturn(true)
+        .when(tagManager)
+        .checkAndImportEntity(Mockito.any(), Mockito.any(), Mockito.any());
   }
 
   @AfterAll
@@ -130,6 +184,24 @@ public class TestTagManager {
 
   @AfterEach
   public void cleanUp() {
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    String[] catalogTags = tagManager.listTagsForMetadataObject(METALAKE, 
catalogObject);
+    tagManager.associateTagsForMetadataObject(METALAKE, catalogObject, null, 
catalogTags);
+
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    String[] schemaTags = tagManager.listTagsForMetadataObject(METALAKE, 
schemaObject);
+    tagManager.associateTagsForMetadataObject(METALAKE, schemaObject, null, 
schemaTags);
+
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+    String[] tableTags = tagManager.listTagsForMetadataObject(METALAKE, 
tableObject);
+    tagManager.associateTagsForMetadataObject(METALAKE, tableObject, null, 
tableTags);
+
     Arrays.stream(tagManager.listTags(METALAKE)).forEach(n -> 
tagManager.deleteTag(METALAKE, n));
   }
 
@@ -245,4 +317,285 @@ public class TestTagManager {
             () -> tagManager.deleteTag("non_existent_metalake", "tag1"));
     Assertions.assertEquals("Metalake non_existent_metalake does not exist", 
e.getMessage());
   }
+
+  @Test
+  public void testAssociateTagsForMetadataObject() {
+    Tag tag1 = tagManager.createTag(METALAKE, "tag1", null, null);
+    Tag tag2 = tagManager.createTag(METALAKE, "tag2", null, null);
+    Tag tag3 = tagManager.createTag(METALAKE, "tag3", null, null);
+
+    // Test associate tags for catalog
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    String[] tagsToAdd = new String[] {tag1.name(), tag2.name(), tag3.name()};
+
+    String[] tags =
+        tagManager.associateTagsForMetadataObject(METALAKE, catalogObject, 
tagsToAdd, null);
+
+    Assertions.assertEquals(3, tags.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1", "tag2", "tag3"), 
ImmutableSet.copyOf(tags));
+
+    // Test disassociate tags for catalog
+    String[] tagsToRemove = new String[] {tag1.name()};
+    String[] tags1 =
+        tagManager.associateTagsForMetadataObject(METALAKE, catalogObject, 
null, tagsToRemove);
+
+    Assertions.assertEquals(2, tags1.length);
+    Assertions.assertEquals(ImmutableSet.of("tag2", "tag3"), 
ImmutableSet.copyOf(tags1));
+
+    // Test associate and disassociate no tags for catalog
+    String[] tags2 = tagManager.associateTagsForMetadataObject(METALAKE, 
catalogObject, null, null);
+
+    Assertions.assertEquals(2, tags2.length);
+    Assertions.assertEquals(ImmutableSet.of("tag2", "tag3"), 
ImmutableSet.copyOf(tags2));
+
+    // Test re-associate tags for catalog
+    Throwable e =
+        Assertions.assertThrows(
+            TagAlreadyAssociatedException.class,
+            () ->
+                tagManager.associateTagsForMetadataObject(
+                    METALAKE, catalogObject, tagsToAdd, null));
+    Assertions.assertTrue(e.getMessage().contains("Failed to associate tags 
for metadata object"));
+
+    // Test associate and disassociate non-existent tags for catalog
+    String[] tags3 =
+        tagManager.associateTagsForMetadataObject(
+            METALAKE, catalogObject, new String[] {"tag4", "tag5"}, new 
String[] {"tag6"});
+
+    Assertions.assertEquals(2, tags3.length);
+    Assertions.assertEquals(ImmutableSet.of("tag2", "tag3"), 
ImmutableSet.copyOf(tags3));
+
+    // Test associate tags for non-existent metadata object
+    MetadataObject nonExistentObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, "non_existent_catalog"),
+            Entity.EntityType.CATALOG);
+    Throwable e1 =
+        Assertions.assertThrows(
+            NotFoundException.class,
+            () ->
+                tagManager.associateTagsForMetadataObject(
+                    METALAKE, nonExistentObject, tagsToAdd, null));
+    Assertions.assertTrue(e1.getMessage().contains("Failed to associate tags 
for metadata object"));
+
+    // Test associate tags for unsupported metadata object
+    MetadataObject metalakeObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofMetalake(METALAKE), 
Entity.EntityType.METALAKE);
+    Throwable e2 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                tagManager.associateTagsForMetadataObject(
+                    METALAKE, metalakeObject, tagsToAdd, null));
+    Assertions.assertTrue(
+        e2.getMessage().contains("Cannot associate tags for unsupported 
metadata object type"));
+
+    // Test associate tags for schema
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    String[] tags4 =
+        tagManager.associateTagsForMetadataObject(METALAKE, schemaObject, 
tagsToAdd, null);
+
+    Assertions.assertEquals(3, tags4.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1", "tag2", "tag3"), 
ImmutableSet.copyOf(tags4));
+
+    // Test associate tags for table
+    String[] tagsToAdd1 = new String[] {tag1.name()};
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+    String[] tags5 =
+        tagManager.associateTagsForMetadataObject(METALAKE, tableObject, 
tagsToAdd1, null);
+
+    Assertions.assertEquals(1, tags5.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1"), 
ImmutableSet.copyOf(tags5));
+
+    // Test associate and disassociate same tags for table
+    String[] tagsToAdd2 = new String[] {tag2.name(), tag3.name()};
+    String[] tagsToRemove1 = new String[] {tag2.name()};
+    String[] tags6 =
+        tagManager.associateTagsForMetadataObject(METALAKE, tableObject, 
tagsToAdd2, tagsToRemove1);
+
+    Assertions.assertEquals(2, tags6.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1", "tag3"), 
ImmutableSet.copyOf(tags6));
+  }
+
+  @Test
+  public void testListMetadataObjectsForTag() {
+    Tag tag1 = tagManager.createTag(METALAKE, "tag1", null, null);
+    Tag tag2 = tagManager.createTag(METALAKE, "tag2", null, null);
+    Tag tag3 = tagManager.createTag(METALAKE, "tag3", null, null);
+
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, catalogObject, new String[] {tag1.name(), tag2.name(), 
tag3.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, schemaObject, new String[] {tag1.name(), tag2.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, tableObject, new String[] {tag1.name()}, null);
+
+    MetadataObject[] objects = tagManager.listMetadataObjectsForTag(METALAKE, 
tag1.name());
+    Assertions.assertEquals(3, objects.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(catalogObject, schemaObject, tableObject), 
ImmutableSet.copyOf(objects));
+
+    MetadataObject[] objects1 = tagManager.listMetadataObjectsForTag(METALAKE, 
tag2.name());
+    Assertions.assertEquals(2, objects1.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(catalogObject, schemaObject), 
ImmutableSet.copyOf(objects1));
+
+    MetadataObject[] objects2 = tagManager.listMetadataObjectsForTag(METALAKE, 
tag3.name());
+    Assertions.assertEquals(1, objects2.length);
+    Assertions.assertEquals(ImmutableSet.of(catalogObject), 
ImmutableSet.copyOf(objects2));
+
+    // List metadata objects for non-existent tag
+    Throwable e =
+        Assertions.assertThrows(
+            NoSuchTagException.class,
+            () -> tagManager.listMetadataObjectsForTag(METALAKE, 
"non_existent_tag"));
+    Assertions.assertTrue(
+        e.getMessage()
+            .contains(
+                "Tag with name non_existent_tag under metalake " + METALAKE + 
" does not exist"));
+  }
+
+  @Test
+  public void testListTagsForMetadataObject() {
+    Tag tag1 = tagManager.createTag(METALAKE, "tag1", null, null);
+    Tag tag2 = tagManager.createTag(METALAKE, "tag2", null, null);
+    Tag tag3 = tagManager.createTag(METALAKE, "tag3", null, null);
+
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, catalogObject, new String[] {tag1.name(), tag2.name(), 
tag3.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, schemaObject, new String[] {tag1.name(), tag2.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, tableObject, new String[] {tag1.name()}, null);
+
+    String[] tags = tagManager.listTagsForMetadataObject(METALAKE, 
catalogObject);
+    Assertions.assertEquals(3, tags.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1", "tag2", "tag3"), 
ImmutableSet.copyOf(tags));
+
+    Tag[] tagsInfo = tagManager.listTagsInfoForMetadataObject(METALAKE, 
catalogObject);
+    Assertions.assertEquals(3, tagsInfo.length);
+    Assertions.assertEquals(ImmutableSet.of(tag1, tag2, tag3), 
ImmutableSet.copyOf(tagsInfo));
+
+    String[] tags1 = tagManager.listTagsForMetadataObject(METALAKE, 
schemaObject);
+    Assertions.assertEquals(2, tags1.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1", "tag2"), 
ImmutableSet.copyOf(tags1));
+
+    Tag[] tagsInfo1 = tagManager.listTagsInfoForMetadataObject(METALAKE, 
schemaObject);
+    Assertions.assertEquals(2, tagsInfo1.length);
+    Assertions.assertEquals(ImmutableSet.of(tag1, tag2), 
ImmutableSet.copyOf(tagsInfo1));
+
+    String[] tags2 = tagManager.listTagsForMetadataObject(METALAKE, 
tableObject);
+    Assertions.assertEquals(1, tags2.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1"), 
ImmutableSet.copyOf(tags2));
+
+    Tag[] tagsInfo2 = tagManager.listTagsInfoForMetadataObject(METALAKE, 
tableObject);
+    Assertions.assertEquals(1, tagsInfo2.length);
+    Assertions.assertEquals(ImmutableSet.of(tag1), 
ImmutableSet.copyOf(tagsInfo2));
+
+    // List tags for non-existent metadata object
+    MetadataObject nonExistentObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, "non_existent_catalog"),
+            Entity.EntityType.CATALOG);
+    Throwable e =
+        Assertions.assertThrows(
+            NotFoundException.class,
+            () -> tagManager.listTagsForMetadataObject(METALAKE, 
nonExistentObject));
+    Assertions.assertTrue(
+        e.getMessage().contains("Failed to list tags for metadata object " + 
nonExistentObject));
+  }
+
+  @Test
+  public void testGetTagForMetadataObject() {
+    Tag tag1 = tagManager.createTag(METALAKE, "tag1", null, null);
+    Tag tag2 = tagManager.createTag(METALAKE, "tag2", null, null);
+    Tag tag3 = tagManager.createTag(METALAKE, "tag3", null, null);
+
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, catalogObject, new String[] {tag1.name(), tag2.name(), 
tag3.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, schemaObject, new String[] {tag1.name(), tag2.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, tableObject, new String[] {tag1.name()}, null);
+
+    Tag result = tagManager.getTagForMetadataObject(METALAKE, catalogObject, 
tag1.name());
+    Assertions.assertEquals(tag1, result);
+
+    Tag result1 = tagManager.getTagForMetadataObject(METALAKE, schemaObject, 
tag1.name());
+    Assertions.assertEquals(tag1, result1);
+
+    Tag result2 = tagManager.getTagForMetadataObject(METALAKE, schemaObject, 
tag2.name());
+    Assertions.assertEquals(tag2, result2);
+
+    Tag result3 = tagManager.getTagForMetadataObject(METALAKE, catalogObject, 
tag3.name());
+    Assertions.assertEquals(tag3, result3);
+
+    // Test get non-existent tag for metadata object
+    Throwable e =
+        Assertions.assertThrows(
+            NoSuchTagException.class,
+            () -> tagManager.getTagForMetadataObject(METALAKE, catalogObject, 
"non_existent_tag"));
+    Assertions.assertTrue(e.getMessage().contains("Tag non_existent_tag does 
not exist"));
+
+    Throwable e1 =
+        Assertions.assertThrows(
+            NoSuchTagException.class,
+            () -> tagManager.getTagForMetadataObject(METALAKE, schemaObject, 
tag3.name()));
+    Assertions.assertTrue(e1.getMessage().contains("Tag tag3 does not exist"));
+
+    Throwable e2 =
+        Assertions.assertThrows(
+            NoSuchTagException.class,
+            () -> tagManager.getTagForMetadataObject(METALAKE, tableObject, 
tag2.name()));
+    Assertions.assertTrue(e2.getMessage().contains("Tag tag2 does not exist"));
+
+    // Test get tag for non-existent metadata object
+    MetadataObject nonExistentObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, "non_existent_catalog"),
+            Entity.EntityType.CATALOG);
+    Throwable e3 =
+        Assertions.assertThrows(
+            NotFoundException.class,
+            () -> tagManager.getTagForMetadataObject(METALAKE, 
nonExistentObject, tag1.name()));
+    Assertions.assertTrue(
+        e3.getMessage().contains("Failed to get tag for metadata object " + 
nonExistentObject));
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/utils/TestMetadataObjectUtil.java 
b/core/src/test/java/org/apache/gravitino/utils/TestMetadataObjectUtil.java
new file mode 100644
index 000000000..1de30d16f
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/utils/TestMetadataObjectUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.utils;
+
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMetadataObjectUtil {
+
+  @Test
+  public void testToEntityType() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjectUtil.toEntityType(null),
+        "metadataObject cannot be null");
+
+    Assertions.assertEquals(
+        Entity.EntityType.METALAKE,
+        MetadataObjectUtil.toEntityType(
+            MetadataObjects.of(null, "metalake", 
MetadataObject.Type.METALAKE)));
+
+    Assertions.assertEquals(
+        Entity.EntityType.CATALOG,
+        MetadataObjectUtil.toEntityType(
+            MetadataObjects.of(null, "catalog", MetadataObject.Type.CATALOG)));
+
+    Assertions.assertEquals(
+        Entity.EntityType.SCHEMA,
+        MetadataObjectUtil.toEntityType(
+            MetadataObjects.of("catalog", "schema", 
MetadataObject.Type.SCHEMA)));
+
+    Assertions.assertEquals(
+        Entity.EntityType.TABLE,
+        MetadataObjectUtil.toEntityType(
+            MetadataObjects.of("catalog.schema", "table", 
MetadataObject.Type.TABLE)));
+
+    Assertions.assertEquals(
+        Entity.EntityType.TOPIC,
+        MetadataObjectUtil.toEntityType(
+            MetadataObjects.of("catalog.schema", "topic", 
MetadataObject.Type.TOPIC)));
+
+    Assertions.assertEquals(
+        Entity.EntityType.FILESET,
+        MetadataObjectUtil.toEntityType(
+            MetadataObjects.of("catalog.schema", "fileset", 
MetadataObject.Type.FILESET)));
+
+    Assertions.assertEquals(
+        Entity.EntityType.COLUMN,
+        MetadataObjectUtil.toEntityType(
+            MetadataObjects.of("catalog.schema.table", "column", 
MetadataObject.Type.COLUMN)));
+  }
+
+  @Test
+  public void testToEntityIdent() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjectUtil.toEntityIdent(null, null),
+        "metadataName cannot be blank");
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjectUtil.toEntityIdent("metalake", null),
+        "metadataObject cannot be null");
+
+    Assertions.assertEquals(
+        NameIdentifier.of("metalake"),
+        MetadataObjectUtil.toEntityIdent(
+            "metalake", MetadataObjects.of(null, "metalake", 
MetadataObject.Type.METALAKE)));
+
+    Assertions.assertEquals(
+        NameIdentifier.of("metalake", "catalog"),
+        MetadataObjectUtil.toEntityIdent(
+            "metalake", MetadataObjects.of(null, "catalog", 
MetadataObject.Type.CATALOG)));
+
+    Assertions.assertEquals(
+        NameIdentifier.of("metalake", "catalog", "schema"),
+        MetadataObjectUtil.toEntityIdent(
+            "metalake", MetadataObjects.of("catalog", "schema", 
MetadataObject.Type.SCHEMA)));
+
+    Assertions.assertEquals(
+        NameIdentifier.of("metalake", "catalog", "schema", "table"),
+        MetadataObjectUtil.toEntityIdent(
+            "metalake", MetadataObjects.of("catalog.schema", "table", 
MetadataObject.Type.TABLE)));
+
+    Assertions.assertEquals(
+        NameIdentifier.of("metalake", "catalog", "schema", "topic"),
+        MetadataObjectUtil.toEntityIdent(
+            "metalake", MetadataObjects.of("catalog.schema", "topic", 
MetadataObject.Type.TOPIC)));
+
+    Assertions.assertEquals(
+        NameIdentifier.of("metalake", "catalog", "schema", "fileset"),
+        MetadataObjectUtil.toEntityIdent(
+            "metalake",
+            MetadataObjects.of("catalog.schema", "fileset", 
MetadataObject.Type.FILESET)));
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            MetadataObjectUtil.toEntityIdent(
+                "metalake",
+                MetadataObjects.of("catalog.schema.table", "column", 
MetadataObject.Type.COLUMN)),
+        "Cannot convert column metadata object to entity identifier: 
catalog.schema.table.column");
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java 
b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
index 487c0084d..964f910ba 100644
--- a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
+++ b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
@@ -18,9 +18,13 @@
  */
 package org.apache.gravitino.utils;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
 import org.apache.gravitino.exceptions.IllegalNamespaceException;
@@ -58,4 +62,66 @@ public class TestNameIdentifierUtil {
         assertThrows(IllegalNamespaceException.class, () -> 
NameIdentifierUtil.checkTable(abc));
     assertTrue(excep3.getMessage().contains("Table namespace must be non-null 
and have 3 levels"));
   }
+
+  @Test
+  public void testToMetadataObject() {
+    // test metalake
+    NameIdentifier metalake = NameIdentifier.of("metalake1");
+    MetadataObject metalakeObject =
+        MetadataObjects.parse("metalake1", MetadataObject.Type.METALAKE);
+    assertEquals(
+        metalakeObject, NameIdentifierUtil.toMetadataObject(metalake, 
Entity.EntityType.METALAKE));
+
+    // test catalog
+    NameIdentifier catalog = NameIdentifier.of("metalake1", "catalog1");
+    MetadataObject catalogObject = MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG);
+    assertEquals(
+        catalogObject, NameIdentifierUtil.toMetadataObject(catalog, 
Entity.EntityType.CATALOG));
+
+    // test schema
+    NameIdentifier schema = NameIdentifier.of("metalake1", "catalog1", 
"schema1");
+    MetadataObject schemaObject =
+        MetadataObjects.parse("catalog1.schema1", MetadataObject.Type.SCHEMA);
+    assertEquals(
+        schemaObject, NameIdentifierUtil.toMetadataObject(schema, 
Entity.EntityType.SCHEMA));
+
+    // test table
+    NameIdentifier table = NameIdentifier.of("metalake1", "catalog1", 
"schema1", "table1");
+    MetadataObject tableObject =
+        MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE);
+    assertEquals(tableObject, NameIdentifierUtil.toMetadataObject(table, 
Entity.EntityType.TABLE));
+
+    // test topic
+    NameIdentifier topic = NameIdentifier.of("metalake1", "catalog1", 
"schema1", "topic1");
+    MetadataObject topicObject =
+        MetadataObjects.parse("catalog1.schema1.topic1", 
MetadataObject.Type.TOPIC);
+    assertEquals(topicObject, NameIdentifierUtil.toMetadataObject(topic, 
Entity.EntityType.TOPIC));
+
+    // test fileset
+    NameIdentifier fileset = NameIdentifier.of("metalake1", "catalog1", 
"schema1", "fileset1");
+    MetadataObject filesetObject =
+        MetadataObjects.parse("catalog1.schema1.fileset1", 
MetadataObject.Type.FILESET);
+    assertEquals(
+        filesetObject, NameIdentifierUtil.toMetadataObject(fileset, 
Entity.EntityType.FILESET));
+
+    // test column
+    Throwable e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> NameIdentifierUtil.toMetadataObject(fileset, 
Entity.EntityType.COLUMN));
+    assertTrue(e.getMessage().contains("Entity type COLUMN is not supported"));
+
+    // test null
+    Throwable e1 =
+        assertThrows(
+            IllegalArgumentException.class, () -> 
NameIdentifierUtil.toMetadataObject(null, null));
+    assertTrue(e1.getMessage().contains("The identifier and entity type must 
not be null"));
+
+    // test unsupported
+    Throwable e2 =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> NameIdentifierUtil.toMetadataObject(fileset, 
Entity.EntityType.TAG));
+    assertTrue(e2.getMessage().contains("Entity type TAG is not supported"));
+  }
 }
diff --git a/scripts/h2/schema-h2.sql b/scripts/h2/schema-h2.sql
index f4c961cb6..0ebf91fa3 100644
--- a/scripts/h2/schema-h2.sql
+++ b/scripts/h2/schema-h2.sql
@@ -241,7 +241,7 @@ CREATE TABLE IF NOT EXISTS `tag_relation_meta` (
     `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation last 
version',
     `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag relation 
deleted at',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `uk_ti_mi_del` (`tag_id`, `metadata_object_id`, `deleted_at`),
+    UNIQUE KEY `uk_ti_mi_mo_del` (`tag_id`, `metadata_object_id`, 
`metadata_object_type`, `deleted_at`),
     KEY `idx_tid` (`tag_id`),
     KEY `idx_mid` (`metadata_object_id`)
     ) ENGINE=InnoDB;
diff --git a/scripts/mysql/schema-0.6.0-mysql.sql 
b/scripts/mysql/schema-0.6.0-mysql.sql
index 8418a5602..b50609da3 100644
--- a/scripts/mysql/schema-0.6.0-mysql.sql
+++ b/scripts/mysql/schema-0.6.0-mysql.sql
@@ -234,7 +234,7 @@ CREATE TABLE IF NOT EXISTS `tag_relation_meta` (
     `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation last 
version',
     `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag relation 
deleted at',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `uk_ti_mi_del` (`tag_id`, `metadata_object_id`, `deleted_at`),
+    UNIQUE KEY `uk_ti_mi_mo_del` (`tag_id`, `metadata_object_id`, 
`metadata_object_type`, `deleted_at`),
     KEY `idx_tid` (`tag_id`),
     KEY `idx_mid` (`metadata_object_id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'tag 
metadata object relation';
diff --git a/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql 
b/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql
index 8fb71f730..3048c67ab 100644
--- a/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql
@@ -60,7 +60,7 @@ CREATE TABLE IF NOT EXISTS `tag_relation_meta` (
     `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation last 
version',
     `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag relation 
deleted at',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `uk_ti_mi_del` (`tag_id`, `metadata_object_id`, `deleted_at`),
+    UNIQUE KEY `uk_ti_mi_mo_del` (`tag_id`, `metadata_object_id`, 
`metadata_object_type`, `deleted_at`),
     KEY `idx_tid` (`tag_id`),
     KEY `idx_mid` (`metadata_object_id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'tag 
metadata object relation';

Reply via email to