This is an automated email from the ASF dual-hosted git repository.

jerryshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 584fcd3896 [#10772] feat(authz): Eventual-consistency invalidation for 
JcasbinAuthorizer caches (#11117)
584fcd3896 is described below

commit 584fcd3896354f75dd6d3a8add3961bdf9625eb2
Author: Qi Yu <[email protected]>
AuthorDate: Wed May 20 14:16:38 2026 +0800

    [#10772] feat(authz): Eventual-consistency invalidation for 
JcasbinAuthorizer caches (#11117)
    
    ### What changes were proposed in this pull request?
    
    Introduces the HA invalidation infrastructure that the JcasbinAuthorizer
    cache refactor (#10772) needs as a prerequisite. The existing
    role-loading path stays on its current TTL-only behavior; the
    version-validated user / group / role caches and the JcasbinRoleLoader
    ride on top of this PR in a follow-up.
    
    **New classes:**
    
    - JcasbinAuthorizationLookups: two-tier metadata-id and owner lookup
    facade (per-request dedup via AuthorizationRequestContext, shared
    Caffeine-backed GravitinoCache fallback, DB on miss). Owners are now
    fetched directly from owner_meta via OwnerMetaMapper instead of the
    OWNER_REL relation query, so the cache key space matches the
    entity-change-log key space.
    - JcasbinChangePoller: scheduled thread that drains entity_change_log
    and owner_meta change rows since a high-water cursor and invalidates the
    affected cache keys. Documents the id-cursor in-flight-commit trade-off
    and the writer-side pre-mutation-name contract.
    
    **Modified:**
    
    - JcasbinAuthorizer: wires the two GravitinoCaches + lookups + poller in
    initialize/close; routes isOwner, OWNER-privilege authorization, and
    handleMetadataOwnerChange through the lookups; drops the in-class
    OwnerInfo (replaced by
    org.apache.gravitino.storage.relational.po.auth.OwnerInfo) and collapses
    loadOwnerPolicy + checkOwnership into ownerMatchesUserOrGroups;
    implements handleEntityNameIdMappingChange.
    - GravitinoAuthorizer: adds the handleEntityNameIdMappingChange default
    method.
    - Configs: adds metadataIdCacheSize and changePollIntervalSecs.
    
    ### Why are the changes needed?
    
    This is a prerequisite split of #10996 (tracking issue #10772) to keep
    that follow-up PR scoped to the role-loading rewrite alone.
    
    Fix: #10772
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. New configs ship with safe defaults and existing behavior is
    preserved for the role-loading path.
    
    ### How was this patch tested?
    
    - TestJcasbinAuthorizer: existing cases (owner mocks adjusted to the new
    OwnerMetaMapper.selectOwnerByMetadataObjectIdAndType path).
    - TestJcasbinAuthorizationLookups: buildCacheKey / isContainerType /
    cascade-prefix hierarchy.
    - TestJcasbinChangePoller: metadataObjectFromChangeLog builds cache keys
    matching the writer-side full-name format.
    
    ```
    ./gradlew :server-common:test --tests 
org.apache.gravitino.server.authorization.jcasbin.TestJcasbinChangePoller 
--tests org.apache.gravitino.server.authorization.jcasbin.TestJcasbinAuthorizer
    ./gradlew :core:test --tests 
org.apache.gravitino.authorization.TestAuthorizationUtils
    ```
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .../main/java/org/apache/gravitino/Configs.java    |  19 ++
 .../authorization/AuthorizationUtils.java          |  16 ++
 .../authorization/GravitinoAuthorizer.java         |  21 +-
 .../gravitino/authorization/OwnerManager.java      |  95 +++---
 .../gravitino/cache/CaffeineGravitinoCache.java    | 104 ++++++-
 .../org/apache/gravitino/cache/GravitinoCache.java |  42 ++-
 .../gravitino/cache/NoOpsGravitinoCache.java       |   7 +
 .../authorization/TestAuthorizationUtils.java      |  28 ++
 .../gravitino/authorization/TestOwnerManager.java  |  41 +++
 .../apache/gravitino/cache/TestGravitinoCache.java | 165 +++++++++++
 docs/security/access-control.md                    |   8 +
 .../jcasbin/JcasbinAuthorizationLookups.java       | 149 ++++++++++
 .../authorization/jcasbin/JcasbinAuthorizer.java   | 214 +++++++-------
 .../authorization/jcasbin/JcasbinChangePoller.java | 320 +++++++++++++++++++++
 .../jcasbin/TestJcasbinAuthorizationLookups.java   | 207 +++++++++++++
 .../jcasbin/TestJcasbinAuthorizer.java             | 230 +++++++++++----
 .../jcasbin/TestJcasbinChangePoller.java           | 182 ++++++++++++
 17 files changed, 1655 insertions(+), 193 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 2f09aa54c8..1c50d74402 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -330,6 +330,25 @@ public class Configs {
           .longConf()
           .createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_OWNER_CACHE_SIZE);
 
+  public static final long 
DEFAULT_GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE = 100000L;
+
+  public static final ConfigEntry<Long> 
GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE =
+      new ConfigBuilder("gravitino.authorization.jcasbin.metadataIdCacheSize")
+          .doc("The maximum size of the metadata-id cache for authorization")
+          .version(ConfigConstants.VERSION_1_3_0)
+          .longConf()
+          
.createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE);
+
+  public static final long 
DEFAULT_GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS = 3L;
+
+  public static final ConfigEntry<Long> 
GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS =
+      new 
ConfigBuilder("gravitino.authorization.jcasbin.changePollIntervalSecs")
+          .doc("The interval in seconds for polling entity and owner changes")
+          .version(ConfigConstants.VERSION_1_3_0)
+          .longConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          
.createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS);
+
   public static final ConfigEntry<List<String>> SERVICE_ADMINS =
       new ConfigBuilder("gravitino.authorization.serviceAdmins")
           .doc("The admins of Gravitino service")
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 16642a5855..2f5518ad84 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -366,6 +366,7 @@ public class AuthorizationUtils {
     // If we enable authorization, we should rename the privileges about the 
entity in the
     // authorization plugin.
     if (GravitinoEnv.getInstance().accessControlDispatcher() != null) {
+      notifyEntityNameIdMappingChange(ident, type);
       MetadataObject oldMetadataObject = 
NameIdentifierUtil.toMetadataObject(ident, type);
       MetadataObject newMetadataObject =
           
NameIdentifierUtil.toMetadataObject(NameIdentifier.of(ident.namespace(), 
newName), type);
@@ -386,6 +387,21 @@ public class AuthorizationUtils {
     }
   }
 
+  private static void notifyEntityNameIdMappingChange(
+      NameIdentifier ident, Entity.EntityType type) {
+    GravitinoAuthorizer gravitinoAuthorizer = 
GravitinoEnv.getInstance().gravitinoAuthorizer();
+    if (gravitinoAuthorizer == null) {
+      return;
+    }
+    String metalake =
+        type == Entity.EntityType.METALAKE ? ident.name() : 
ident.namespace().level(0);
+    try {
+      gravitinoAuthorizer.handleEntityNameIdMappingChange(metalake, ident, 
type);
+    } catch (RuntimeException e) {
+      LOG.warn("Failed to notify entity name-id mapping change for {}", ident, 
e);
+    }
+  }
+
   public static Role filterSecurableObjects(
       RoleEntity role, String metalakeName, String catalogName) {
     List<SecurableObject> securableObjects = role.securableObjects();
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
index 6ab74382ad..12529f74c6 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
@@ -19,6 +19,7 @@ package org.apache.gravitino.authorization;
 
 import java.io.Closeable;
 import java.security.Principal;
+import javax.annotation.Nullable;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
@@ -160,10 +161,26 @@ public interface GravitinoAuthorizer extends Closeable {
    * changes.
    *
    * @param metalake metalake;
-   * @param oldOwnerId The old owner id;
+   * @param oldOwnerId The old owner id; null when setting the first owner.
    * @param nameIdentifier The metadata name identifier;
    * @param type entity type
    */
   void handleMetadataOwnerChange(
-      String metalake, Long oldOwnerId, NameIdentifier nameIdentifier, 
Entity.EntityType type);
+      String metalake,
+      @Nullable Long oldOwnerId,
+      NameIdentifier nameIdentifier,
+      Entity.EntityType type);
+
+  /**
+   * Called when an entity name-to-id mapping may have changed because of a 
rename or drop.
+   * Implementations evict the cache key for the given entity and all of its 
descendants.
+   *
+   * @param metalake the metalake name
+   * @param nameIdentifier the entity name identifier
+   * @param type the entity type
+   */
+  default void handleEntityNameIdMappingChange(
+      String metalake, NameIdentifier nameIdentifier, Entity.EntityType type) {
+    // default no-op for backward compatibility
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
index 70e10f1488..9e17ef64ec 100644
--- a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
+++ b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.authorization;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import javax.annotation.Nullable;
 import lombok.Getter;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
@@ -118,7 +119,7 @@ public class OwnerManager implements OwnerDispatcher {
           metadataObject,
           authorizationPlugin ->
               authorizationPlugin.onOwnerSet(metadataObject, 
originOwner.orElse(null), newOwner));
-      originOwner.ifPresent(owner -> notifyOwnerChange(owner, metalake, 
metadataObject));
+      notifyOwnerChange(originOwner.orElse(null), metalake, metadataObject);
     } catch (NoSuchEntityException nse) {
       LOG.warn(
           "Metadata object {} or owner {} is not found", 
metadataObject.fullName(), ownerName, nse);
@@ -134,41 +135,65 @@ public class OwnerManager implements OwnerDispatcher {
     }
   }
 
-  private void notifyOwnerChange(Owner oldOwner, String metalake, 
MetadataObject metadataObject) {
+  private void notifyOwnerChange(
+      @Nullable Owner oldOwner, String metalake, MetadataObject 
metadataObject) {
     GravitinoAuthorizer gravitinoAuthorizer = 
GravitinoEnv.getInstance().gravitinoAuthorizer();
-    if (gravitinoAuthorizer != null) {
-      try {
-        Long oldOwnerId;
-        if (oldOwner.type() == Owner.Type.USER) {
-          UserEntity userEntity =
-              GravitinoEnv.getInstance()
-                  .entityStore()
-                  .get(
-                      NameIdentifierUtil.ofUser(metalake, oldOwner.name()),
-                      Entity.EntityType.USER,
-                      UserEntity.class);
-          oldOwnerId = userEntity.id();
-        } else if (oldOwner.type() == Owner.Type.GROUP) {
-          GroupEntity groupEntity =
-              GravitinoEnv.getInstance()
-                  .entityStore()
-                  .get(
-                      NameIdentifierUtil.ofGroup(metalake, oldOwner.name()),
-                      Entity.EntityType.GROUP,
-                      GroupEntity.class);
-          oldOwnerId = groupEntity.id();
-        } else {
-          LOG.warn("Unsupported owner type: {}", oldOwner.type());
-          return;
-        }
-        gravitinoAuthorizer.handleMetadataOwnerChange(
-            metalake,
-            oldOwnerId,
-            MetadataObjectUtil.toEntityIdent(metalake, metadataObject),
-            Entity.EntityType.valueOf(metadataObject.type().name()));
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-      }
+    if (gravitinoAuthorizer == null) {
+      return;
+    }
+
+    Long oldOwnerId;
+    try {
+      oldOwnerId = oldOwner == null ? null : getOwnerId(metalake, oldOwner);
+    } catch (IOException e) {
+      LOG.warn(
+          "Failed to resolve previous owner id for {} (metalake={}, 
oldOwner={}); "
+              + "cache invalidation for this change may be skipped on this 
node",
+          metadataObject.fullName(),
+          metalake,
+          oldOwner,
+          e);
+      return;
+    }
+
+    try {
+      gravitinoAuthorizer.handleMetadataOwnerChange(
+          metalake,
+          oldOwnerId,
+          MetadataObjectUtil.toEntityIdent(metalake, metadataObject),
+          Entity.EntityType.valueOf(metadataObject.type().name()));
+    } catch (RuntimeException e) {
+      // Best-effort hook: a failing authorizer must not fail the owner-change 
operation itself.
+      LOG.warn(
+          "Authorizer hook failed for owner change on {} (metalake={}, 
oldOwnerId={})",
+          metadataObject.fullName(),
+          metalake,
+          oldOwnerId,
+          e);
+    }
+  }
+
+  private Long getOwnerId(String metalake, Owner owner) throws IOException {
+    if (owner.type() == Owner.Type.USER) {
+      UserEntity userEntity =
+          GravitinoEnv.getInstance()
+              .entityStore()
+              .get(
+                  NameIdentifierUtil.ofUser(metalake, owner.name()),
+                  Entity.EntityType.USER,
+                  UserEntity.class);
+      return userEntity.id();
+    } else if (owner.type() == Owner.Type.GROUP) {
+      GroupEntity groupEntity =
+          GravitinoEnv.getInstance()
+              .entityStore()
+              .get(
+                  NameIdentifierUtil.ofGroup(metalake, owner.name()),
+                  Entity.EntityType.GROUP,
+                  GroupEntity.class);
+      return groupEntity.id();
+    } else {
+      throw new IllegalArgumentException("Unsupported owner type: " + 
owner.type());
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java 
b/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java
index f1ed54df75..2154dea3dc 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java
@@ -22,13 +22,23 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Ticker;
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  * A Caffeine-backed implementation of {@link GravitinoCache}. Supports 
configurable TTL and maximum
  * size.
  *
+ * <p>A fair {@link ReentrantReadWriteLock} serialises invalidations against 
reads/puts. Reads,
+ * loader-backed gets, and puts share the read lock; {@code invalidate*} 
operations take the write
+ * lock. See the field-level Javadoc on {@link #lock} for the races this 
guards against and the
+ * cost/benefit reasoning. Callers that need a multi-step invalidation to 
appear atomic wrap their
+ * sequence in {@link #runInvalidationBatch(Runnable)}.
+ *
  * @param <K> the key type
  * @param <V> the value type
  */
@@ -36,6 +46,37 @@ public class CaffeineGravitinoCache<K, V> implements 
GravitinoCache<K, V> {
 
   private final Cache<K, V> cache;
 
+  /**
+   * Serialises invalidations against reads/puts so a batched invalidation 
never appears
+   * half-applied to concurrent readers. Two related races motivate this:
+   *
+   * <ol>
+   *   <li><b>Single-node read/invalidate race (addressed here).</b> When the 
change poller drains a
+   *       batch of stale keys and calls {@link #invalidate} per key, a reader 
can interleave
+   *       between two invalidations and observe a half-applied batch — some 
entries already
+   *       evicted, others still hot. The exclusive write lock makes each 
batch atomic from a
+   *       reader's point of view: readers see either the pre-batch state or 
the post-batch state,
+   *       never a mix. Callers that need multi-step atomicity wrap their 
sequence in {@link
+   *       #runInvalidationBatch(Runnable)}.
+   *   <li><b>Multi-node staleness window (NOT addressed here, and no lock 
can).</b> After a commit
+   *       on node A, node B keeps returning stale data until B's poller runs 
(~poll interval, on
+   *       the order of seconds). The race in (1) is a ~ms slice of that 
larger window and
+   *       self-heals once the batch completes — the affected keys are gone 
from the cache, so the
+   *       next read pulls fresh data. Callers that need read-after-write 
consistency must bypass
+   *       the cache; nothing on node B can make node A's commit visible 
sooner.
+   * </ol>
+   *
+   * <p>Cost: the poller runs every few seconds with small batches, so 
contention is rare; a fair RW
+   * lock under no contention is on the order of nanoseconds. Dropping the 
lock would be defensible
+   * — race (2) dominates the staleness budget — but keeping it makes the 
cache's invalidation
+   * semantics easy to reason about and test, at a cost we won't measure on 
the hot path.
+   *
+   * <p>Fair mode prevents writer starvation: a parked {@code invalidate*} 
call queues new readers
+   * behind it instead of letting them slip in ahead. Reads sit on the 
authorization hot path and
+   * would otherwise starve the rarer invalidations.
+   */
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
   /**
    * Creates a new CaffeineGravitinoCache with the given TTL and maximum size.
    *
@@ -57,35 +98,57 @@ public class CaffeineGravitinoCache<K, V> implements 
GravitinoCache<K, V> {
 
   @Override
   public Optional<V> getIfPresent(K key) {
-    V value = cache.getIfPresent(key);
-    return Optional.ofNullable(value);
+    return withReadLock(() -> Optional.ofNullable(cache.getIfPresent(key)));
+  }
+
+  @Override
+  public V get(K key, Function<K, V> loader) {
+    return withReadLock(
+        () ->
+            cache.get(
+                key,
+                k -> Objects.requireNonNull(loader.apply(k), "Cache loader 
must not return null")));
   }
 
   @Override
   public void put(K key, V value) {
-    cache.put(key, value);
+    // Puts run under the shared read lock so that they may proceed 
concurrently with reads and
+    // with other puts; only invalidations need exclusive access.
+    withReadLock(() -> cache.put(key, value));
   }
 
   @Override
   public void invalidate(K key) {
-    cache.invalidate(key);
+    withWriteLock(() -> cache.invalidate(key));
   }
 
   @Override
   public void invalidateAll() {
-    cache.invalidateAll();
+    withWriteLock(cache::invalidateAll);
   }
 
   @Override
   public void invalidateByPrefix(String prefix) {
     // Prefix invalidation scans all keys. It is intended for infrequent 
structural invalidations
     // such as dropping or renaming an entity hierarchy, not for per-request 
hot paths.
-    cache.asMap().keySet().removeIf(k -> k instanceof String && ((String) 
k).startsWith(prefix));
+    withWriteLock(
+        () ->
+            cache
+                .asMap()
+                .keySet()
+                .removeIf(k -> k instanceof String && ((String) 
k).startsWith(prefix)));
+  }
+
+  @Override
+  public void runInvalidationBatch(Runnable batch) {
+    // The write lock is reentrant, so individual invalidate* calls inside the 
batch re-acquire it
+    // without deadlocking.
+    withWriteLock(batch);
   }
 
   @Override
   public long size() {
-    return cache.estimatedSize();
+    return withReadLock(cache::estimatedSize);
   }
 
   @VisibleForTesting
@@ -98,4 +161,31 @@ public class CaffeineGravitinoCache<K, V> implements 
GravitinoCache<K, V> {
     cache.invalidateAll();
     cache.cleanUp();
   }
+
+  private <T> T withReadLock(Supplier<T> action) {
+    lock.readLock().lock();
+    try {
+      return action.get();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  private void withReadLock(Runnable action) {
+    lock.readLock().lock();
+    try {
+      action.run();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  private void withWriteLock(Runnable action) {
+    lock.writeLock().lock();
+    try {
+      action.run();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java 
b/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java
index 3771608e3c..7eaf839b50 100644
--- a/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java
@@ -19,7 +19,9 @@
 package org.apache.gravitino.cache;
 
 import java.io.Closeable;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
 
 /**
  * A general-purpose cache interface used by the authorization subsystem. 
Implementations include a
@@ -38,6 +40,26 @@ public interface GravitinoCache<K, V> extends Closeable {
    */
   Optional<V> getIfPresent(K key);
 
+  /**
+   * Returns the value associated with the key, loading it if necessary. 
Implementations with real
+   * storage should make the load atomic for the same key, and should 
serialise concurrent reads
+   * against any {@link #invalidate(Object) invalidate} call so a reader 
cannot observe a partially
+   * invalidated cache.
+   *
+   * @param key the cache key
+   * @param loader the loader invoked when the key is absent
+   * @return the cached or loaded value
+   */
+  default V get(K key, Function<K, V> loader) {
+    Optional<V> value = getIfPresent(key);
+    if (value.isPresent()) {
+      return value.get();
+    }
+    V loaded = Objects.requireNonNull(loader.apply(key), "Cache loader must 
not return null");
+    put(key, loaded);
+    return loaded;
+  }
+
   /**
    * Associates the value with the key in the cache.
    *
@@ -58,13 +80,29 @@ public interface GravitinoCache<K, V> extends Closeable {
 
   /**
    * Evicts all entries whose key is a String and starts with the given 
prefix. Only meaningful when
-   * K = String. Used by metadataIdCache for hierarchical cascade 
invalidation: dropping a catalog
-   * evicts the catalog entry plus all schema/table/fileset/... entries 
beneath it.
+   * K = String. Used by metadataIdCache for path-based invalidation: dropping 
a catalog evicts the
+   * catalog entry plus all schema/table/fileset/... entries under that 
catalog name path.
    *
    * @param prefix the prefix to match against key strings
    */
   void invalidateByPrefix(String prefix);
 
+  /**
+   * Runs {@code batch} so that all {@link #invalidate}, {@link 
#invalidateAll}, and {@link
+   * #invalidateByPrefix} calls it makes appear atomic to concurrent readers — 
i.e. readers see
+   * either the state from before the batch or the state after, never an 
intermediate half-applied
+   * state.
+   *
+   * <p>Default implementation: just runs the runnable, suitable for caches 
without real storage
+   * (e.g. {@link NoOpsGravitinoCache}). Storage-backed implementations should 
hold an exclusive
+   * lock for the duration of the batch.
+   *
+   * @param batch the invalidation sequence to run atomically
+   */
+  default void runInvalidationBatch(Runnable batch) {
+    batch.run();
+  }
+
   /**
    * Returns the approximate number of entries in the cache.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java 
b/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java
index 8c41eabeef..8d7bc53edd 100644
--- a/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java
@@ -18,7 +18,9 @@
  */
 package org.apache.gravitino.cache;
 
+import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
 
 /**
  * A no-op implementation of {@link GravitinoCache} that never caches 
anything. Useful for testing
@@ -34,6 +36,11 @@ public class NoOpsGravitinoCache<K, V> implements 
GravitinoCache<K, V> {
     return Optional.empty();
   }
 
+  @Override
+  public V get(K key, Function<K, V> loader) {
+    return Objects.requireNonNull(loader.apply(key), "Cache loader must not 
return null");
+  }
+
   @Override
   public void put(K key, V value) {
     // no-op
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
index 3071e23f9a..e27b68bb91 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
@@ -31,8 +31,10 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.catalog.CatalogDispatcher;
+import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.catalog.SchemaDispatcher;
 import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
 import org.apache.gravitino.exceptions.IllegalNamespaceException;
 import org.apache.gravitino.meta.AuditInfo;
@@ -40,6 +42,7 @@ import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.rel.Table;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
 class TestAuthorizationUtils {
@@ -289,4 +292,29 @@ class TestAuthorizationUtils {
     Assertions.assertEquals(1, locations.size());
     Assertions.assertEquals("schemaLocation", locations.get(0));
   }
+
+  @Test
+  void testRenamePrivilegesNotifiesOldEntityNameIdMappingChange() {
+    GravitinoAuthorizer authorizer = Mockito.mock(GravitinoAuthorizer.class);
+    AccessControlDispatcher accessControlDispatcher = 
Mockito.mock(AccessControlDispatcher.class);
+    CatalogManager catalogManager = Mockito.mock(CatalogManager.class);
+    BaseCatalog<?> baseCatalog = Mockito.mock(BaseCatalog.class);
+    
Mockito.when(catalogManager.loadCatalog(Mockito.any())).thenReturn(baseCatalog);
+
+    GravitinoEnv envMock = Mockito.mock(GravitinoEnv.class);
+    Mockito.when(envMock.gravitinoAuthorizer()).thenReturn(authorizer);
+    
Mockito.when(envMock.accessControlDispatcher()).thenReturn(accessControlDispatcher);
+    Mockito.when(envMock.catalogManager()).thenReturn(catalogManager);
+
+    try (MockedStatic<GravitinoEnv> envStatic = 
Mockito.mockStatic(GravitinoEnv.class)) {
+      envStatic.when(GravitinoEnv::getInstance).thenReturn(envMock);
+
+      NameIdentifier ident = NameIdentifier.of("metalake", "catalog", 
"schema", "table");
+      AuthorizationUtils.authorizationPluginRenamePrivileges(
+          ident, Entity.EntityType.TABLE, "new_table");
+
+      Mockito.verify(authorizer)
+          .handleEntityNameIdMappingChange("metalake", ident, 
Entity.EntityType.TABLE);
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java 
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
index 56dd78f797..5226e5d13d 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
@@ -43,14 +43,17 @@ import java.util.Collections;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
@@ -59,6 +62,7 @@ import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.UserEntity;
@@ -243,4 +247,41 @@ public class TestOwnerManager {
             ownerManager.setOwner(
                 METALAKE, metalakeObject, "non-existent-group", 
Owner.Type.GROUP));
   }
+
+  @Test
+  @Order(4)
+  public void testInitialOwnerSetNotifiesAuthorizer() throws 
IllegalAccessException, IOException {
+    String catalogName = "catalog_initial_owner_notify";
+    AuditInfo audit = 
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+    CatalogEntity catalog =
+        CatalogEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(catalogName)
+            .withNamespace(Namespace.of(METALAKE))
+            .withType(Catalog.Type.RELATIONAL)
+            .withProvider("test")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(catalog, false);
+
+    GravitinoAuthorizer authorizer = Mockito.mock(GravitinoAuthorizer.class);
+    GravitinoAuthorizer originalAuthorizer = 
GravitinoEnv.getInstance().gravitinoAuthorizer();
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "gravitinoAuthorizer", 
authorizer, true);
+    try {
+      MetadataObject catalogObject =
+          MetadataObjects.of(Lists.newArrayList(catalogName), 
MetadataObject.Type.CATALOG);
+
+      ownerManager.setOwner(METALAKE, catalogObject, USER, Owner.Type.USER);
+
+      Mockito.verify(authorizer)
+          .handleMetadataOwnerChange(
+              Mockito.eq(METALAKE),
+              Mockito.isNull(),
+              Mockito.eq(NameIdentifier.of(METALAKE, catalogName)),
+              Mockito.eq(Entity.EntityType.CATALOG));
+    } finally {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "gravitinoAuthorizer", 
originalAuthorizer, true);
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java 
b/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java
index 132e142002..8cbf4e7232 100644
--- a/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java
+++ b/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java
@@ -19,7 +19,13 @@
 package org.apache.gravitino.cache;
 
 import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.awaitility.Awaitility;
@@ -53,6 +59,148 @@ public class TestGravitinoCache {
     }
   }
 
+  @Test
+  void testCaffeineGetLoadsSameKeyAtomically() throws Exception {
+    CaffeineGravitinoCache<String, Long> cache = new 
CaffeineGravitinoCache<>(60_000L, 1000L);
+    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    try {
+      AtomicLong loadCount = new AtomicLong();
+      CountDownLatch ready = new CountDownLatch(8);
+      CountDownLatch start = new CountDownLatch(1);
+      List<Future<Long>> futures = new ArrayList<>();
+
+      for (int i = 0; i < 8; i++) {
+        futures.add(
+            executorService.submit(
+                () -> {
+                  ready.countDown();
+                  start.await();
+                  return cache.get(
+                      "shared",
+                      key -> {
+                        loadCount.incrementAndGet();
+                        return 100L;
+                      });
+                }));
+      }
+
+      Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS));
+      start.countDown();
+      for (Future<Long> future : futures) {
+        Assertions.assertEquals(100L, future.get(5, TimeUnit.SECONDS));
+      }
+
+      Assertions.assertEquals(1L, loadCount.get());
+      Assertions.assertEquals(1L, cache.size());
+    } finally {
+      executorService.shutdownNow();
+      cache.close();
+    }
+  }
+
+  @Test
+  void testCaffeineInvalidateWaitsForInFlightReader() throws Exception {
+    CaffeineGravitinoCache<String, Long> cache = new 
CaffeineGravitinoCache<>(60_000L, 1000L);
+    ExecutorService executorService = Executors.newFixedThreadPool(2);
+    try {
+      CountDownLatch readerHoldsLock = new CountDownLatch(1);
+      CountDownLatch readerMayProceed = new CountDownLatch(1);
+
+      Future<Long> reader =
+          executorService.submit(
+              () ->
+                  cache.get(
+                      "k",
+                      key -> {
+                        readerHoldsLock.countDown();
+                        try {
+                          Assertions.assertTrue(readerMayProceed.await(5, 
TimeUnit.SECONDS));
+                        } catch (InterruptedException e) {
+                          Thread.currentThread().interrupt();
+                          throw new RuntimeException(e);
+                        }
+                        return 42L;
+                      }));
+
+      Assertions.assertTrue(readerHoldsLock.await(5, TimeUnit.SECONDS));
+      Future<?> invalidator = executorService.submit(() -> 
cache.invalidate("k"));
+
+      // Invalidator should be parked on the write lock until the reader 
releases its read lock.
+      Thread.sleep(150);
+      Assertions.assertFalse(
+          invalidator.isDone(), "invalidate must not proceed while a reader 
holds the read lock");
+
+      readerMayProceed.countDown();
+      Assertions.assertEquals(42L, reader.get(5, TimeUnit.SECONDS));
+      invalidator.get(5, TimeUnit.SECONDS);
+
+      // Reader installed the value, invalidator then removed it.
+      Assertions.assertFalse(cache.getIfPresent("k").isPresent());
+    } finally {
+      executorService.shutdownNow();
+      cache.close();
+    }
+  }
+
+  @Test
+  void testCaffeineRunInvalidationBatchIsAtomicForReaders() throws Exception {
+    CaffeineGravitinoCache<String, Long> cache = new 
CaffeineGravitinoCache<>(60_000L, 1000L);
+    ExecutorService executorService = Executors.newFixedThreadPool(2);
+    try {
+      cache.put("k1", 1L);
+      cache.put("k2", 2L);
+      cache.put("k3", 3L);
+
+      CountDownLatch batchEntered = new CountDownLatch(1);
+      CountDownLatch batchMayProceed = new CountDownLatch(1);
+
+      Future<?> batcher =
+          executorService.submit(
+              () ->
+                  cache.runInvalidationBatch(
+                      () -> {
+                        batchEntered.countDown();
+                        try {
+                          Assertions.assertTrue(batchMayProceed.await(5, 
TimeUnit.SECONDS));
+                        } catch (InterruptedException e) {
+                          Thread.currentThread().interrupt();
+                          throw new RuntimeException(e);
+                        }
+                        cache.invalidate("k1");
+                        cache.invalidate("k2");
+                        cache.invalidate("k3");
+                      }));
+
+      Assertions.assertTrue(batchEntered.await(5, TimeUnit.SECONDS));
+
+      // Reader started while the batcher holds the write lock must block 
until the batch ends.
+      Future<Optional<Long>> reader = executorService.submit(() -> 
cache.getIfPresent("k2"));
+      Thread.sleep(150);
+      Assertions.assertFalse(reader.isDone(), "reader must wait for the batch 
to release the lock");
+
+      batchMayProceed.countDown();
+      batcher.get(5, TimeUnit.SECONDS);
+
+      // Reader observes the post-batch state, never a partially-invalidated 
cache.
+      Assertions.assertFalse(reader.get(5, TimeUnit.SECONDS).isPresent());
+      Assertions.assertEquals(0, cache.size());
+    } finally {
+      executorService.shutdownNow();
+      cache.close();
+    }
+  }
+
+  @Test
+  void testCaffeineGetRejectsNullLoadResult() {
+    CaffeineGravitinoCache<String, Long> cache = new 
CaffeineGravitinoCache<>(60_000L, 1000L);
+    try {
+      Assertions.assertThrows(NullPointerException.class, () -> 
cache.get("key", key -> null));
+      Assertions.assertFalse(cache.getIfPresent("key").isPresent());
+    } finally {
+      cache.close();
+    }
+  }
+
   @Test
   void testCaffeineInvalidate() {
     CaffeineGravitinoCache<String, String> cache = new 
CaffeineGravitinoCache<>(60_000L, 1000L);
@@ -172,6 +320,23 @@ public class TestGravitinoCache {
     }
   }
 
+  @Test
+  void testNoOpsGetAlwaysLoadsAndDoesNotCache() {
+    NoOpsGravitinoCache<String, Long> cache = new NoOpsGravitinoCache<>();
+    try {
+      AtomicLong loadCount = new AtomicLong();
+
+      Assertions.assertEquals(1L, cache.get("key", key -> 
loadCount.incrementAndGet()));
+      Assertions.assertEquals(2L, cache.get("key", key -> 
loadCount.incrementAndGet()));
+
+      Assertions.assertEquals(2L, loadCount.get());
+      Assertions.assertFalse(cache.getIfPresent("key").isPresent());
+      Assertions.assertEquals(0, cache.size());
+    } finally {
+      cache.close();
+    }
+  }
+
   @Test
   void testCaffeineWithNonStringKeys() {
     CaffeineGravitinoCache<Long, String> cache = new 
CaffeineGravitinoCache<>(60_000L, 1000L);
diff --git a/docs/security/access-control.md b/docs/security/access-control.md
index 76858304c7..4bbb274e1e 100644
--- a/docs/security/access-control.md
+++ b/docs/security/access-control.md
@@ -451,6 +451,8 @@ To enable access control in Gravitino, configure the 
following settings in your
 | `gravitino.authorization.jcasbin.cacheExpirationSecs`   | The expiration 
time in seconds for authorization cache entries            | `3600`        | No 
                                         | 1.1.1         |
 | `gravitino.authorization.jcasbin.roleCacheSize`         | The maximum size 
of the role cache for authorization                      | `10000`       | No   
                                       | 1.1.1         |
 | `gravitino.authorization.jcasbin.ownerCacheSize`        | The maximum size 
of the owner cache for authorization                     | `100000`      | No   
                                       | 1.1.1         |
+| `gravitino.authorization.jcasbin.metadataIdCacheSize`   | The maximum size 
of the metadata ID cache for authorization               | `100000`      | No   
                                       | 1.3.0         |
+| `gravitino.authorization.jcasbin.changePollIntervalSecs` | The interval in 
seconds for polling entity and owner changes              | `3`           | No  
                                        | 1.3.0         |
 
 ### Authorization Cache
 
@@ -462,6 +464,10 @@ Gravitino uses Caffeine caches to improve authorization 
performance by caching r
 
 - **`ownerCacheSize`**: Controls the maximum number of owner relationship 
entries that can be cached. This cache maps metadata object IDs to their owner 
IDs.
 
+- **`metadataIdCacheSize`**: Controls the maximum number of metadata 
name-to-ID mapping entries that can be cached. This cache maps metadata object 
names to internal metadata IDs used by JCasbin authorization checks.
+
+- **`changePollIntervalSecs`**: Controls how often a Gravitino server polls 
persisted entity and owner changes to invalidate local JCasbin authorization 
caches in multi-node deployments.
+
 :::info
 When role privileges or ownership are changed through the Gravitino API, the 
corresponding cache entries are automatically invalidated to ensure 
authorization decisions reflect the latest state.
 :::
@@ -488,6 +494,8 @@ gravitino.authorization.serviceAdmins = admin1,admin2
 gravitino.authorization.jcasbin.cacheExpirationSecs = 3600
 gravitino.authorization.jcasbin.roleCacheSize = 10000
 gravitino.authorization.jcasbin.ownerCacheSize = 100000
+gravitino.authorization.jcasbin.metadataIdCacheSize = 100000
+gravitino.authorization.jcasbin.changePollIntervalSecs = 3
 ```
 
 ## Migration Guide
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizationLookups.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizationLookups.java
new file mode 100644
index 0000000000..f9171bd72e
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizationLookups.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.server.authorization.MetadataIdConverter;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+
+/**
+ * Two-tier metadata-id and owner resolution for {@link JcasbinAuthorizer}.
+ *
+ * <p>Each lookup is deduplicated within a single request via {@link 
AuthorizationRequestContext},
+ * falls back to a shared {@link GravitinoCache} on a request miss, and 
finally issues a single DB
+ * query on a cache miss. A successful DB fetch populates both tiers so 
subsequent calls — in this
+ * request and later ones — hit the cache. The two underlying caches are 
invalidated externally by
+ * {@link JcasbinChangePoller} (HA peers) and by the {@link
+ * 
org.apache.gravitino.authorization.GravitinoAuthorizer#handleMetadataOwnerChange}
 / {@link
+ * 
org.apache.gravitino.authorization.GravitinoAuthorizer#handleEntityNameIdMappingChange}
 hooks
+ * (local mutations).
+ */
+public class JcasbinAuthorizationLookups {
+
+  /** Unit Separator for internal path-based cache keys. */
+  static final String KEY_SEP = "\u001F";
+
+  private final GravitinoCache<String, Long> metadataIdCache;
+  private final GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache;
+
+  /**
+   * Creates a new lookups facade around the supplied caches. The caches are 
owned by the caller and
+   * remain accessible for invalidation by other components (poller, change 
hooks).
+   *
+   * @param metadataIdCache path-based {@code 
metalake::catalog::schema::object::TYPE} → entity id
+   * @param ownerRelCache {@code metadataObjectId} → {@link Optional} of 
{@link OwnerInfo}
+   */
+  public JcasbinAuthorizationLookups(
+      GravitinoCache<String, Long> metadataIdCache,
+      GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache) {
+    this.metadataIdCache = metadataIdCache;
+    this.ownerRelCache = ownerRelCache;
+  }
+
+  /**
+   * Two-tier name→id lookup: the per-request map in {@code requestContext} 
dedups calls within the
+   * same HTTP request; on a miss, the long-lived {@code metadataIdCache} is 
consulted, and finally
+   * we fall back to a DB query via {@link MetadataIdConverter#getID}.
+   */
+  public Long resolveMetadataId(
+      MetadataObject metadataObject, String metalake, 
AuthorizationRequestContext requestContext) {
+    String cacheKey = buildCacheKey(metalake, metadataObject);
+    return requestContext.computeMetadataIdIfAbsent(
+        cacheKey,
+        k ->
+            metadataIdCache.get(k, ignored -> 
MetadataIdConverter.getID(metadataObject, metalake)));
+  }
+
+  /**
+   * Two-tier owner lookup: request-level dedup first, then the shared {@code 
ownerRelCache}, and
+   * finally a single {@code owner_meta} query. A successful DB fetch 
populates both tiers so
+   * subsequent {@code isOwner} calls — in this request and later ones — hit 
the cache.
+   */
+  public Optional<OwnerInfo> resolveOwnerId(
+      Long metadataId,
+      MetadataObject.Type metadataType,
+      AuthorizationRequestContext requestContext) {
+    return requestContext.computeOwnerIfAbsent(
+        metadataId,
+        id ->
+            ownerRelCache.get(
+                id,
+                ignored -> {
+                  OwnerInfo ownerInfo =
+                      SessionUtils.getWithoutCommit(
+                          OwnerMetaMapper.class,
+                          m -> m.selectOwnerByMetadataObjectIdAndType(id, 
metadataType.name()));
+                  return ownerInfo == null ? Optional.empty() : 
Optional.of(ownerInfo);
+                }));
+  }
+
+  /** Underlying metadata-id cache; exposed for invalidation by the change 
hooks and the poller. */
+  public GravitinoCache<String, Long> metadataIdCache() {
+    return metadataIdCache;
+  }
+
+  /** Underlying owner cache; exposed for invalidation by the change hooks and 
the poller. */
+  public GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache() {
+    return ownerRelCache;
+  }
+
+  /**
+   * Builds a path-based cache key for the metadataIdCache. Container objects 
end with the internal
+   * separator so a prefix invalidation can remove the container and all 
entries under the same name
+   * path.
+   *
+   * <p>Examples: {@code metalake<sep>}, {@code metalake<sep>catalog<sep>}, 
{@code
+   * metalake<sep>catalog<sep>schema<sep>}, {@code
+   * metalake<sep>catalog<sep>schema<sep>table<sep>TABLE}.
+   */
+  @VisibleForTesting
+  public static String buildCacheKey(String metalake, MetadataObject 
metadataObject) {
+    if (metadataObject.type() == MetadataObject.Type.METALAKE) {
+      return metalake + KEY_SEP;
+    }
+    StringBuilder sb = new StringBuilder(metalake);
+    sb.append(KEY_SEP);
+    // fullName uses '.' as separator, e.g. "catalog1.schema1.table1"
+    String[] parts = metadataObject.fullName().split("\\.");
+    sb.append(String.join(KEY_SEP, parts));
+    if (isContainerType(metadataObject.type())) {
+      // Trailing separator enables prefix-based invalidation.
+      sb.append(KEY_SEP);
+    } else {
+      // Leaf nodes get the type suffix to avoid collisions
+      sb.append(KEY_SEP);
+      sb.append(metadataObject.type().name());
+    }
+    return sb.toString();
+  }
+
+  /** Returns true for entity types that can contain children (metalake, 
catalog, schema). */
+  @VisibleForTesting
+  public static boolean isContainerType(MetadataObject.Type type) {
+    return type == MetadataObject.Type.METALAKE
+        || type == MetadataObject.Type.CATALOG
+        || type == MetadataObject.Type.SCHEMA;
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 3778f8b19f..2929bf7415 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -56,12 +56,14 @@ import 
org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.cache.CaffeineGravitinoCache;
+import org.apache.gravitino.cache.GravitinoCache;
 import org.apache.gravitino.exceptions.NoSuchUserException;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
-import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.casbin.jcasbin.main.Enforcer;
@@ -93,7 +95,17 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
    */
   private Cache<Long, Boolean> loadedRoles;
 
-  private Cache<Long, Optional<OwnerInfo>> ownerRel;
+  /** Path-based {@code metalake::catalog::schema::object::TYPE} → entity id. 
*/
+  private GravitinoCache<String, Long> metadataIdCache;
+
+  /** {@code metadataObjectId} → {@link Optional} of {@link OwnerInfo}. */
+  private GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache;
+
+  /** Two-tier lookup facade (per-request dedup + shared cache + DB fallback). 
*/
+  private JcasbinAuthorizationLookups lookups;
+
+  /** Background HA invalidator for {@link #metadataIdCache} and {@link 
#ownerRelCache}. */
+  private JcasbinChangePoller changePoller;
 
   private Executor executor = null;
 
@@ -107,6 +119,16 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         
GravitinoEnv.getInstance().config().get(Configs.GRAVITINO_AUTHORIZATION_ROLE_CACHE_SIZE);
     long ownerCacheSize =
         
GravitinoEnv.getInstance().config().get(Configs.GRAVITINO_AUTHORIZATION_OWNER_CACHE_SIZE);
+    long metadataIdCacheSize =
+        GravitinoEnv.getInstance()
+            .config()
+            .get(Configs.GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE);
+    long pollIntervalSecs =
+        GravitinoEnv.getInstance()
+            .config()
+            .get(Configs.GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS);
+
+    long ttlMs = TimeUnit.SECONDS.toMillis(cacheExpirationSecs);
 
     // Initialize enforcers before the caches that reference them in removal 
listeners
     allowEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new 
GravitinoAdapter());
@@ -127,11 +149,14 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
                   }
                 })
             .build();
-    ownerRel =
-        Caffeine.newBuilder()
-            .expireAfterAccess(cacheExpirationSecs, TimeUnit.SECONDS)
-            .maximumSize(ownerCacheSize)
-            .build();
+    // The change poller is the primary HA invalidation path. These 
write-based TTLs bound the
+    // stale window if a poll cycle misses a change; access-based TTLs could 
keep hot stale entries
+    // alive indefinitely.
+    metadataIdCache = new CaffeineGravitinoCache<>(ttlMs, metadataIdCacheSize);
+    ownerRelCache = new CaffeineGravitinoCache<>(ttlMs, ownerCacheSize);
+    lookups = new JcasbinAuthorizationLookups(metadataIdCache, ownerRelCache);
+    changePoller = new JcasbinChangePoller(metadataIdCache, ownerRelCache, 
pollIntervalSecs);
+    changePoller.start();
     executor =
         Executors.newFixedThreadPool(
             GravitinoEnv.getInstance()
@@ -226,9 +251,10 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       AuthorizationRequestContext requestContext) {
     boolean result;
     try {
-      Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-      loadOwnerPolicy(metalake, metadataObject, metadataId);
-      result = checkOwnership(principal, metalake, metadataId);
+      Long metadataId = lookups.resolveMetadataId(metadataObject, metalake, 
requestContext);
+      Optional<OwnerInfo> owner =
+          lookups.resolveOwnerId(metadataId, metadataObject.type(), 
requestContext);
+      result = ownerMatchesUserOrGroups(owner, principal, metalake);
     } catch (Exception e) {
       LOG.debug("Can not get entity id", e);
       result = false;
@@ -421,18 +447,55 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
   public void handleMetadataOwnerChange(
       String metalake, Long oldOwnerId, NameIdentifier nameIdentifier, 
Entity.EntityType type) {
     MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(nameIdentifier, type);
-    Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-    ownerRel.invalidate(metadataId);
+    // Owner mutations may happen after drop/recreate with the same name. 
Invalidate the
+    // name->id mapping as well to prevent using a stale metadataId from 
metadataIdCache.
+    
metadataIdCache.invalidate(JcasbinAuthorizationLookups.buildCacheKey(metalake, 
metadataObject));
+    try {
+      Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+      ownerRelCache.invalidate(metadataId);
+    } catch (RuntimeException e) {
+      LOG.warn("Failed to resolve metadata id for owner cache invalidation: 
{}", metadataObject, e);
+    }
+  }
+
+  @Override
+  public void handleEntityNameIdMappingChange(
+      String metalake, NameIdentifier nameIdentifier, Entity.EntityType type) {
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(nameIdentifier, type);
+    String cacheKey = JcasbinAuthorizationLookups.buildCacheKey(metalake, 
metadataObject);
+    if (JcasbinAuthorizationLookups.isContainerType(metadataObject.type())) {
+      // Prefix invalidation: metalake::catalog:: removes catalog + all 
children.
+      metadataIdCache.invalidateByPrefix(cacheKey);
+    } else {
+      metadataIdCache.invalidate(cacheKey);
+    }
   }
 
   @Override
   public void close() throws IOException {
+    if (changePoller != null) {
+      changePoller.close();
+    }
     if (executor != null) {
       if (executor instanceof ThreadPoolExecutor) {
         ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
         threadPoolExecutor.shutdown();
+        try {
+          if (!threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+            threadPoolExecutor.shutdownNow();
+          }
+        } catch (InterruptedException e) {
+          threadPoolExecutor.shutdownNow();
+          Thread.currentThread().interrupt();
+        }
       }
     }
+    if (metadataIdCache != null) {
+      metadataIdCache.close();
+    }
+    if (ownerRelCache != null) {
+      ownerRelCache.close();
+    }
   }
 
   private class InternalAuthorizer {
@@ -464,13 +527,14 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       try {
         UserEntity userEntity = getUserEntity(username, metalake);
         userId = userEntity.id();
-        metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+        metadataId = lookups.resolveMetadataId(metadataObject, metalake, 
requestContext);
       } catch (Exception e) {
         LOG.debug("Can not get entity id", e);
         return false;
       }
       loadRolePrivilege(metalake, username, userId, requestContext);
-      return authorizeByJcasbin(userId, metalake, metadataObject, metadataId, 
privilege);
+      return authorizeByJcasbin(
+          userId, metalake, metadataObject, metadataId, privilege, 
requestContext);
     }
 
     private boolean authorizeByJcasbin(
@@ -478,9 +542,12 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         String metalake,
         MetadataObject metadataObject,
         Long metadataId,
-        String privilege) {
+        String privilege,
+        AuthorizationRequestContext requestContext) {
       if (AuthConstants.OWNER.equals(privilege)) {
-        return checkOwnership(PrincipalUtils.getCurrentPrincipal(), metalake, 
metadataId);
+        Optional<OwnerInfo> owner =
+            lookups.resolveOwnerId(metadataId, metadataObject.type(), 
requestContext);
+        return ownerMatchesUserOrGroups(owner, 
PrincipalUtils.getCurrentPrincipal(), metalake);
       }
       return enforcer.enforce(
           String.valueOf(userId),
@@ -627,43 +694,6 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     loadRoleFutures.add(loadRoleFuture);
   }
 
-  private void loadOwnerPolicy(String metalake, MetadataObject metadataObject, 
Long metadataId) {
-    if (ownerRel.getIfPresent(metadataId) != null) {
-      LOG.debug("Metadata {} OWNER has been loaded.", metadataId);
-      return;
-    }
-    try {
-      NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
-      EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
-      List<? extends Entity> owners =
-          entityStore
-              .relationOperations()
-              .listEntitiesByRelation(
-                  SupportsRelationOperations.Type.OWNER_REL,
-                  entityIdent,
-                  Entity.EntityType.valueOf(metadataObject.type().name()));
-      if (owners.isEmpty()) {
-        ownerRel.put(metadataId, Optional.empty());
-      } else {
-        for (Entity ownerEntity : owners) {
-          if (ownerEntity instanceof UserEntity) {
-            UserEntity user = (UserEntity) ownerEntity;
-            ownerRel.put(
-                metadataId,
-                Optional.of(new OwnerInfo(user.id(), Entity.EntityType.USER, 
user.name())));
-          } else if (ownerEntity instanceof GroupEntity) {
-            GroupEntity group = (GroupEntity) ownerEntity;
-            ownerRel.put(
-                metadataId,
-                Optional.of(new OwnerInfo(group.id(), Entity.EntityType.GROUP, 
group.name())));
-          }
-        }
-      }
-    } catch (IOException e) {
-      LOG.warn("Can not load metadata owner", e);
-    }
-  }
-
   private void loadPolicyByRoleEntity(RoleEntity roleEntity) {
     String metalake = 
NameIdentifierUtil.getMetalake(roleEntity.nameIdentifier());
     List<SecurableObject> securableObjects = roleEntity.securableObjects();
@@ -702,63 +732,49 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
   }
 
   /**
-   * Checks whether the given principal is the owner of the metadata object 
identified by
-   * metadataId. Supports both user and group ownership.
+   * Returns true when the resolved owner matches the current user (by id) or 
one of the user's
+   * groups. We compare by entity id rather than name so the cached snapshot 
survives a
+   * delete-then-recreate-with-same-name scenario without spuriously granting 
ownership to the new
+   * entity.
    */
-  private boolean checkOwnership(Principal principal, String metalake, Long 
metadataId) {
-    Optional<OwnerInfo> ownerOpt = ownerRel.getIfPresent(metadataId);
-    if (ownerOpt == null || !ownerOpt.isPresent()) {
+  private boolean ownerMatchesUserOrGroups(
+      Optional<OwnerInfo> owner, Principal principal, String metalake) {
+    if (!owner.isPresent()) {
       return false;
     }
-    OwnerInfo owner = ownerOpt.get();
-    // We compare by entity ID rather than name to guard against stale cache 
entries.
-    // If a user/group is deleted and recreated with the same name, the cached 
OwnerInfo
-    // still holds the old ID. A name-only comparison would incorrectly grant 
ownership
-    // to the new entity. The extra IO to fetch the current entity ensures 
correctness.
-    if (owner.type == Entity.EntityType.USER) {
+    OwnerInfo ownerInfo = owner.get();
+    if 
(Entity.EntityType.USER.name().equalsIgnoreCase(ownerInfo.getOwnerType())) {
       try {
         UserEntity userEntity = getUserEntity(principal.getName(), metalake);
-        return Objects.equals(userEntity.id(), owner.id);
+        return Objects.equals(userEntity.id(), ownerInfo.getOwnerId());
       } catch (Exception e) {
         LOG.debug("Can not get user entity for ownership check", e);
         return false;
       }
-    } else if (owner.type == Entity.EntityType.GROUP) {
-      if (principal instanceof UserPrincipal) {
-        List<UserGroup> groups = ((UserPrincipal) principal).getGroups();
-        if (groups.isEmpty()) {
-          return false;
-        }
-        try {
-          List<NameIdentifier> groupIdents =
-              groups.stream()
-                  .map(g -> NameIdentifierUtil.ofGroup(metalake, 
g.getGroupname()))
-                  .collect(Collectors.toList());
-          List<GroupEntity> groupEntities =
-              GravitinoEnv.getInstance()
-                  .entityStore()
-                  .batchGet(groupIdents, Entity.EntityType.GROUP, 
GroupEntity.class);
-          return groupEntities.stream().anyMatch(ge -> Objects.equals(ge.id(), 
owner.id));
-        } catch (Exception e) {
-          LOG.debug("Can not get group entities for ownership check", e);
-          return false;
-        }
-      }
+    }
+    if 
(!Entity.EntityType.GROUP.name().equalsIgnoreCase(ownerInfo.getOwnerType())) {
       return false;
     }
-    return false;
-  }
-
-  /** Holds the owner identity for a metadata object in the owner cache. */
-  static class OwnerInfo {
-    final Long id;
-    final Entity.EntityType type;
-    final String name;
-
-    OwnerInfo(Long id, Entity.EntityType type, String name) {
-      this.id = id;
-      this.type = type;
-      this.name = name;
+    if (!(principal instanceof UserPrincipal)) {
+      return false;
+    }
+    List<UserGroup> groups = ((UserPrincipal) principal).getGroups();
+    if (groups.isEmpty()) {
+      return false;
+    }
+    try {
+      List<NameIdentifier> groupIdents =
+          groups.stream()
+              .map(g -> NameIdentifierUtil.ofGroup(metalake, g.getGroupname()))
+              .collect(Collectors.toList());
+      List<GroupEntity> groupEntities =
+          GravitinoEnv.getInstance()
+              .entityStore()
+              .batchGet(groupIdents, Entity.EntityType.GROUP, 
GroupEntity.class);
+      return groupEntities.stream().anyMatch(ge -> Objects.equals(ge.id(), 
ownerInfo.getOwnerId()));
+    } catch (Exception e) {
+      LOG.debug("Can not get group entities for ownership check", e);
+      return false;
     }
   }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinChangePoller.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinChangePoller.java
new file mode 100644
index 0000000000..611cc9616b
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinChangePoller.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.po.auth.ChangedOwnerInfo;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.apache.gravitino.storage.relational.po.cache.EntityChangeRecord;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Eventual-consistency invalidator for {@link JcasbinAuthorizer}'s {@code 
metadataIdCache} and
+ * {@code ownerRelCache}.
+ *
+ * <p>One scheduled thread drains {@code entity_change_log} and {@code 
owner_meta} change rows since
+ * a high-water-mark cursor and invalidates the affected keys. Other Gravitino 
nodes therefore
+ * observe ALTER/DROP and owner changes within one poll interval.
+ *
+ * <p>Both polls run on every tick — a failure in one does not stop the other.
+ */
+public class JcasbinChangePoller implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JcasbinChangePoller.class);
+
+  /** Max entity-change rows to fetch per poller cycle. */
+  private static final int ENTITY_CHANGE_POLLER_MAX_ROWS = 500;
+
+  private final GravitinoCache<String, Long> metadataIdCache;
+  private final GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache;
+  private final long pollIntervalSecs;
+
+  private ScheduledExecutorService scheduler;
+  private volatile long ownerPollHighWaterId = 0;
+  private volatile long entityPollHighWaterId = 0;
+
+  /**
+   * @param metadataIdCache the metadata-id cache to invalidate on entity 
changes
+   * @param ownerRelCache the owner cache to invalidate on owner changes
+   * @param pollIntervalSecs interval between successive polling cycles
+   */
+  public JcasbinChangePoller(
+      GravitinoCache<String, Long> metadataIdCache,
+      GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache,
+      long pollIntervalSecs) {
+    Preconditions.checkArgument(pollIntervalSecs > 0, "pollIntervalSecs must 
be positive");
+    this.metadataIdCache = metadataIdCache;
+    this.ownerRelCache = ownerRelCache;
+    this.pollIntervalSecs = pollIntervalSecs;
+  }
+
+  /**
+   * Initializes the high-water cursors to the current DB tail (so startup 
does not scan historical
+   * changes) and schedules periodic polling.
+   *
+   * <p>Known trade-off: an id-based high-water mark can miss rows whose id is 
allocated before the
+   * cursor snapshot but whose commit lands after it. Concretely, if writer A 
holds {@code id=N-1}
+   * uncommitted while writer B commits {@code id=N}, {@code 
selectMaxChangeId()} returns N and the
+   * next poll queries {@code id > N} — A's row is never consumed. In that 
case the affected cache
+   * entry stays stale until either (a) a request-side path catches it on the 
next request, or (b)
+   * TTL eviction. Acceptable for the eventual-consistency caches targeted 
here; revisit if we ever
+   * route strong-consistency data through this poller.
+   */
+  public void start() {
+    ownerPollHighWaterId =
+        getOrDefault(
+            SessionUtils.getWithoutCommit(
+                OwnerMetaMapper.class, OwnerMetaMapper::selectMaxChangeId));
+    entityPollHighWaterId =
+        getOrDefault(
+            SessionUtils.getWithoutCommit(
+                EntityChangeLogMapper.class, 
EntityChangeLogMapper::selectMaxChangeId));
+
+    scheduler =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread t = new Thread(r);
+              t.setName("GravitinoAuthorizer-ChangePoller");
+              t.setDaemon(true);
+              return t;
+            });
+    scheduler.scheduleWithFixedDelay(
+        this::pollChanges, pollIntervalSecs, pollIntervalSecs, 
TimeUnit.SECONDS);
+  }
+
+  @VisibleForTesting
+  void pollChanges() {
+    try {
+      LOG.debug("Polling for owner changes after id {}", ownerPollHighWaterId);
+      pollOwnerChanges();
+    } catch (Exception e) {
+      if (handleInterruptIfAny(e, "Owner change poll")) {
+        return;
+      }
+      LOG.warn("Owner change poll failed", e);
+    }
+
+    try {
+      LOG.debug("Polling for entity changes after id {}", 
entityPollHighWaterId);
+      pollEntityChanges();
+    } catch (Exception e) {
+      if (handleInterruptIfAny(e, "Entity change poll")) {
+        return;
+      }
+      LOG.warn("Entity change poll failed", e);
+    }
+  }
+
+  /**
+   * Restores the interrupt flag and returns true if {@code e} carries (or the 
current thread has
+   * accumulated) an interruption, so the caller can bail out of this poll 
cycle quickly during
+   * {@link #close()}-driven {@code shutdownNow()}.
+   */
+  private static boolean handleInterruptIfAny(Throwable e, String context) {
+    Throwable t = e;
+    while (t != null) {
+      if (t instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+        LOG.debug("{} interrupted, stopping poll cycle", context);
+        return true;
+      }
+      t = t.getCause();
+    }
+    if (Thread.currentThread().isInterrupted()) {
+      LOG.debug("{} ran while thread was interrupted, stopping poll cycle", 
context);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Drains owner-change rows past {@link #ownerPollHighWaterId} and 
invalidates the affected {@code
+   * ownerRelCache} entries. Each row carries {@code metadataObjectId}, so 
invalidation is a direct
+   * key removal — no name resolution needed.
+   *
+   * <p>The {@code synchronized} modifier is defensive. In production this 
method is only invoked
+   * from the single-threaded scheduler started in {@link #start()}, and {@link
+   * java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay} 
guarantees that
+   * consecutive runs do not overlap. The cursor field {@link 
#ownerPollHighWaterId} is also {@code
+   * volatile}, and cache invalidations are now atomic at the cache layer via 
{@link
+   * org.apache.gravitino.cache.GravitinoCache#runInvalidationBatch}. The 
keyword is kept so that
+   * future callers — additional schedulers, ad-hoc invocations from tests or 
admin tooling — do not
+   * silently introduce concurrent {@code "select changes → invalidate → 
advance cursor"} sequences.
+   * Cost is negligible under no contention thanks to biased / elided locking.
+   */
+  private synchronized void pollOwnerChanges() {
+    List<ChangedOwnerInfo> changes =
+        SessionUtils.getWithoutCommit(
+            OwnerMetaMapper.class, m -> 
m.selectChangedOwners(ownerPollHighWaterId));
+    if (changes.isEmpty()) {
+      return;
+    }
+
+    long[] maxSeenId = {ownerPollHighWaterId};
+    // Hold the cache's exclusive invalidation lock for the whole batch so 
readers never observe
+    // a half-applied state where some of this batch's entries have been 
evicted and others are
+    // still hot.
+    ownerRelCache.runInvalidationBatch(
+        () -> {
+          for (ChangedOwnerInfo change : changes) {
+            ownerRelCache.invalidate(change.getMetadataObjectId());
+            if (change.getId() > maxSeenId[0]) {
+              maxSeenId[0] = change.getId();
+            }
+          }
+        });
+    ownerPollHighWaterId = maxSeenId[0];
+  }
+
+  /**
+   * Drains entity-change rows past {@link #entityPollHighWaterId} and 
invalidates the affected
+   * {@code metadataIdCache} keys.
+   *
+   * <p><b>Contract with the writer side:</b> {@code 
entity_change_log.full_name} must be the
+   * <i>pre-mutation</i> name (the name that consumers currently have cached). 
The writers in {@code
+   * SchemaMetaService} / {@code TableMetaService} / etc. emit {@code 
oldFullName} on rename and the
+   * current name on drop, so the cacheKey we build here resolves to the entry 
a peer node would
+   * have populated under that name. If a future change starts emitting the 
new post-rename name,
+   * this invalidation will silently miss and stale entries will only clear 
via LRU eviction.
+   *
+   * <p>The {@code synchronized} modifier is defensive — see the note on 
{@link #pollOwnerChanges()}
+   * for the rationale. The single-threaded scheduler already prevents 
overlapping runs in
+   * production, and the per-batch invalidation atomicity is provided by the 
cache itself.
+   */
+  private synchronized void pollEntityChanges() {
+    List<EntityChangeRecord> changes =
+        SessionUtils.getWithoutCommit(
+            EntityChangeLogMapper.class,
+            m -> m.selectEntityChanges(entityPollHighWaterId, 
ENTITY_CHANGE_POLLER_MAX_ROWS));
+
+    long maxSeenId = entityPollHighWaterId;
+    Set<String> containerPrefixes = new LinkedHashSet<>();
+    Set<String> leafKeys = new LinkedHashSet<>();
+    for (EntityChangeRecord change : changes) {
+      String metalake = change.getMetalakeName();
+      String entityType = change.getEntityType();
+      String fullName = change.getFullName();
+
+      MetadataObject.Type mdType;
+      try {
+        mdType = 
MetadataObject.Type.valueOf(entityType.toUpperCase(Locale.ROOT));
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Unknown entity type in change log: {}", entityType);
+        if (change.getId() > maxSeenId) {
+          maxSeenId = change.getId();
+        }
+        continue;
+      }
+
+      MetadataObject mdObj = metadataObjectFromChangeLog(metalake, fullName, 
mdType);
+      String cacheKey = JcasbinAuthorizationLookups.buildCacheKey(metalake, 
mdObj);
+
+      if (JcasbinAuthorizationLookups.isContainerType(mdType)) {
+        addCoalescedPrefix(containerPrefixes, cacheKey);
+      } else {
+        leafKeys.add(cacheKey);
+      }
+
+      if (change.getId() > maxSeenId) {
+        maxSeenId = change.getId();
+      }
+    }
+    invalidateCoalescedKeys(containerPrefixes, leafKeys);
+    entityPollHighWaterId = maxSeenId;
+  }
+
+  @Override
+  public void close() {
+    if (scheduler != null) {
+      scheduler.shutdown();
+      try {
+        if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+          scheduler.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        scheduler.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static MetadataObject metadataObjectFromChangeLog(
+      String metalake, String fullName, MetadataObject.Type type) {
+    List<String> names = new ArrayList<>(Arrays.asList(fullName.split("\\.")));
+    if (type != MetadataObject.Type.METALAKE
+        && !names.isEmpty()
+        && Objects.equals(names.get(0), metalake)) {
+      names.remove(0);
+    }
+    return MetadataObjects.of(names, type);
+  }
+
+  private static long getOrDefault(Long value) {
+    return value == null ? 0L : value;
+  }
+
+  private static void addCoalescedPrefix(Set<String> prefixes, String 
candidate) {
+    for (String prefix : prefixes) {
+      if (candidate.startsWith(prefix)) {
+        return;
+      }
+    }
+    prefixes.removeIf(prefix -> prefix.startsWith(candidate));
+    prefixes.add(candidate);
+  }
+
+  private void invalidateCoalescedKeys(Set<String> prefixes, Set<String> 
leafKeys) {
+    if (prefixes.isEmpty() && leafKeys.isEmpty()) {
+      return;
+    }
+    // Hold the cache's exclusive invalidation lock for the whole batch so 
readers never observe
+    // a half-applied state where some prefix/leaf keys have been evicted and 
others have not.
+    metadataIdCache.runInvalidationBatch(
+        () -> {
+          for (String prefix : prefixes) {
+            metadataIdCache.invalidateByPrefix(prefix);
+          }
+          for (String leafKey : leafKeys) {
+            if (prefixes.stream().noneMatch(leafKey::startsWith)) {
+              metadataIdCache.invalidate(leafKey);
+            }
+          }
+        });
+  }
+}
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizationLookups.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizationLookups.java
new file mode 100644
index 0000000000..cfab7f8010
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizationLookups.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link JcasbinAuthorizationLookups} static helpers. */
+public class TestJcasbinAuthorizationLookups {
+
+  // ---------- buildCacheKey ----------
+
+  @Test
+  void testBuildCacheKeyMetalake() {
+    MetadataObject obj =
+        MetadataObjects.of(Collections.singletonList("ml1"), 
MetadataObject.Type.METALAKE);
+    Assertions.assertEquals(key("ml1", ""), 
JcasbinAuthorizationLookups.buildCacheKey("ml1", obj));
+  }
+
+  @Test
+  void testBuildCacheKeyCatalog() {
+    MetadataObject obj =
+        MetadataObjects.of(Collections.singletonList("cat1"), 
MetadataObject.Type.CATALOG);
+    Assertions.assertEquals(
+        key("ml1", "cat1", ""), 
JcasbinAuthorizationLookups.buildCacheKey("ml1", obj));
+  }
+
+  @Test
+  void testBuildCacheKeySchema() {
+    MetadataObject obj =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1"), 
MetadataObject.Type.SCHEMA);
+    Assertions.assertEquals(
+        key("ml1", "cat1", "sch1", ""), 
JcasbinAuthorizationLookups.buildCacheKey("ml1", obj));
+  }
+
+  @Test
+  void testBuildCacheKeyLeafTypesGetTypeSuffix() {
+    MetadataObject table =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"), 
MetadataObject.Type.TABLE);
+    Assertions.assertEquals(
+        key("ml1", "cat1", "sch1", "tbl1", "TABLE"),
+        JcasbinAuthorizationLookups.buildCacheKey("ml1", table));
+
+    MetadataObject view =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "v1"), 
MetadataObject.Type.VIEW);
+    Assertions.assertEquals(
+        key("ml1", "cat1", "sch1", "v1", "VIEW"),
+        JcasbinAuthorizationLookups.buildCacheKey("ml1", view));
+
+    MetadataObject fileset =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "fs1"), 
MetadataObject.Type.FILESET);
+    Assertions.assertEquals(
+        key("ml1", "cat1", "sch1", "fs1", "FILESET"),
+        JcasbinAuthorizationLookups.buildCacheKey("ml1", fileset));
+  }
+
+  // ---------- isContainerType ----------
+
+  @Test
+  void testIsContainerTypeContainerTypes() {
+    Assertions.assertTrue(
+        
JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.METALAKE));
+    
Assertions.assertTrue(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.CATALOG));
+    
Assertions.assertTrue(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.SCHEMA));
+  }
+
+  @Test
+  void testIsContainerTypeLeafTypes() {
+    
Assertions.assertFalse(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.TABLE));
+    
Assertions.assertFalse(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.VIEW));
+    Assertions.assertFalse(
+        
JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.FILESET));
+    
Assertions.assertFalse(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.TOPIC));
+  }
+
+  // ---------- Prefix invalidation ----------
+
+  @Test
+  void testPrefixInvalidationCoversContainerPath() {
+    // Dropping a catalog should use a prefix that covers all schemas and 
tables below it.
+    MetadataObject catalog =
+        MetadataObjects.of(Collections.singletonList("cat1"), 
MetadataObject.Type.CATALOG);
+    String catalogKey = JcasbinAuthorizationLookups.buildCacheKey("ml1", 
catalog);
+
+    MetadataObject schema =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1"), 
MetadataObject.Type.SCHEMA);
+    String schemaKey = JcasbinAuthorizationLookups.buildCacheKey("ml1", 
schema);
+
+    MetadataObject table =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"), 
MetadataObject.Type.TABLE);
+    String tableKey = JcasbinAuthorizationLookups.buildCacheKey("ml1", table);
+
+    Assertions.assertTrue(schemaKey.startsWith(catalogKey));
+    Assertions.assertTrue(tableKey.startsWith(catalogKey));
+    Assertions.assertTrue(tableKey.startsWith(schemaKey));
+  }
+
+  @Test
+  void testResolveMetadataIdUsesAtomicSharedCacheAndRequestDedup() {
+    MetadataObject table =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"), 
MetadataObject.Type.TABLE);
+    CountingCache<String, Long> metadataIdCache = new CountingCache<>(100L);
+    CountingCache<Long, Optional<OwnerInfo>> ownerRelCache = new 
CountingCache<>(Optional.empty());
+    JcasbinAuthorizationLookups lookups =
+        new JcasbinAuthorizationLookups(metadataIdCache, ownerRelCache);
+    AuthorizationRequestContext requestContext = new 
AuthorizationRequestContext();
+
+    Assertions.assertEquals(100L, lookups.resolveMetadataId(table, "ml1", 
requestContext));
+    Assertions.assertEquals(100L, lookups.resolveMetadataId(table, "ml1", 
requestContext));
+
+    Assertions.assertEquals(1, metadataIdCache.getCount);
+    Assertions.assertEquals(0, metadataIdCache.getIfPresentCount);
+    Assertions.assertEquals(0, metadataIdCache.putCount);
+  }
+
+  @Test
+  void testResolveOwnerIdUsesAtomicSharedCacheAndRequestDedup() {
+    CountingCache<String, Long> metadataIdCache = new CountingCache<>(100L);
+    CountingCache<Long, Optional<OwnerInfo>> ownerRelCache = new 
CountingCache<>(Optional.empty());
+    JcasbinAuthorizationLookups lookups =
+        new JcasbinAuthorizationLookups(metadataIdCache, ownerRelCache);
+    AuthorizationRequestContext requestContext = new 
AuthorizationRequestContext();
+
+    Assertions.assertFalse(
+        lookups.resolveOwnerId(100L, MetadataObject.Type.TABLE, 
requestContext).isPresent());
+    Assertions.assertFalse(
+        lookups.resolveOwnerId(100L, MetadataObject.Type.TABLE, 
requestContext).isPresent());
+
+    Assertions.assertEquals(1, ownerRelCache.getCount);
+    Assertions.assertEquals(0, ownerRelCache.getIfPresentCount);
+    Assertions.assertEquals(0, ownerRelCache.putCount);
+  }
+
+  private static String key(String... parts) {
+    return String.join(JcasbinAuthorizationLookups.KEY_SEP, parts);
+  }
+
+  private static class CountingCache<K, V> implements GravitinoCache<K, V> {
+    private final V value;
+    private int getCount;
+    private int getIfPresentCount;
+    private int putCount;
+
+    private CountingCache(V value) {
+      this.value = value;
+    }
+
+    @Override
+    public Optional<V> getIfPresent(K key) {
+      getIfPresentCount++;
+      return Optional.empty();
+    }
+
+    @Override
+    public V get(K key, Function<K, V> loader) {
+      getCount++;
+      return value;
+    }
+
+    @Override
+    public void put(K key, V value) {
+      putCount++;
+    }
+
+    @Override
+    public void invalidate(K key) {}
+
+    @Override
+    public void invalidateAll() {}
+
+    @Override
+    public void invalidateByPrefix(String prefix) {}
+
+    @Override
+    public long size() {
+      return 0;
+    }
+
+    @Override
+    public void close() {}
+  }
+}
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index afa48efc98..3e65d4fb4a 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -24,8 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
@@ -37,10 +38,15 @@ import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.security.Principal;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Entity;
@@ -56,6 +62,7 @@ import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.authorization.AuthorizationRequestContext;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.cache.GravitinoCache;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.GroupEntity;
@@ -64,15 +71,19 @@ import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.server.ServerConfig;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
-import 
org.apache.gravitino.server.authorization.jcasbin.JcasbinAuthorizer.OwnerInfo;
+import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
 import org.apache.gravitino.storage.relational.service.OwnerMetaService;
 import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.casbin.jcasbin.main.Enforcer;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -114,6 +125,12 @@ public class TestJcasbinAuthorizer {
 
   private static MockedStatic<OwnerMetaService> ownerMetaServiceMockedStatic;
 
+  private static MockedStatic<SessionUtils> sessionUtilsMockedStatic;
+
+  private static OwnerMetaMapper ownerMetaMapper = mock(OwnerMetaMapper.class);
+
+  private static EntityChangeLogMapper entityChangeLogMapper = 
mock(EntityChangeLogMapper.class);
+
   private static JcasbinAuthorizer jcasbinAuthorizer;
 
   private static ObjectMapper objectMapper = new ObjectMapper();
@@ -123,6 +140,31 @@ public class TestJcasbinAuthorizer {
     OwnerMetaService ownerMetaService = mock(OwnerMetaService.class);
     ownerMetaServiceMockedStatic = mockStatic(OwnerMetaService.class);
     
ownerMetaServiceMockedStatic.when(OwnerMetaService::getInstance).thenReturn(ownerMetaService);
+    when(ownerMetaMapper.selectMaxChangeId()).thenReturn(0L);
+    
when(ownerMetaMapper.selectChangedOwners(anyLong())).thenReturn(Collections.emptyList());
+    when(entityChangeLogMapper.selectMaxChangeId()).thenReturn(0L);
+    when(entityChangeLogMapper.selectEntityChanges(anyLong(), anyInt()))
+        .thenReturn(Collections.emptyList());
+
+    // The change poller probes entity_change_log + owner_meta on startup and 
owner lookups go via
+    // SessionUtils; mock SessionUtils to delegate to mapper mocks so tests 
can stub owner state
+    // without opening a real MyBatis session. Poller-only mapper calls return 
safe empty defaults.
+    sessionUtilsMockedStatic = mockStatic(SessionUtils.class);
+    sessionUtilsMockedStatic
+        .when(() -> SessionUtils.getWithoutCommit(any(), any()))
+        .thenAnswer(
+            invocation -> {
+              Class<?> mapperClass = invocation.getArgument(0);
+              Function<Object, Object> func = invocation.getArgument(1);
+              if (mapperClass == OwnerMetaMapper.class) {
+                return func.apply(ownerMetaMapper);
+              }
+              if (mapperClass == EntityChangeLogMapper.class) {
+                return func.apply(entityChangeLogMapper);
+              }
+              return null;
+            });
+
     gravitinoEnvMockedStatic = mockStatic(GravitinoEnv.class);
     
gravitinoEnvMockedStatic.when(GravitinoEnv::getInstance).thenReturn(gravitinoEnv);
     when(gravitinoEnv.config()).thenReturn(new ServerConfig());
@@ -161,6 +203,13 @@ public class TestJcasbinAuthorizer {
 
   @AfterAll
   public static void stop() {
+    if (jcasbinAuthorizer != null) {
+      try {
+        jcasbinAuthorizer.close();
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to close JcasbinAuthorizer", e);
+      }
+    }
     if (principalUtilsMockedStatic != null) {
       principalUtilsMockedStatic.close();
     }
@@ -170,6 +219,9 @@ public class TestJcasbinAuthorizer {
     if (ownerMetaServiceMockedStatic != null) {
       ownerMetaServiceMockedStatic.close();
     }
+    if (sessionUtilsMockedStatic != null) {
+      sessionUtilsMockedStatic.close();
+    }
     if (gravitinoEnvMockedStatic != null) {
       gravitinoEnvMockedStatic.close();
     }
@@ -258,23 +310,23 @@ public class TestJcasbinAuthorizer {
   @Test
   public void testAuthorizeByOwner() throws Exception {
     Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
-    assertFalse(doAuthorizeOwner(currentPrincipal));
     NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE, 
"testCatalog");
-    List<UserEntity> owners = ImmutableList.of(getUserEntity());
-    doReturn(owners)
-        .when(supportsRelationOperations)
-        .listEntitiesByRelation(
-            eq(SupportsRelationOperations.Type.OWNER_REL),
-            eq(catalogIdent),
-            eq(Entity.EntityType.CATALOG));
+
+    // No owner set — should fail
+    when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID), 
eq("CATALOG")))
+        .thenReturn(null);
+    getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
+    assertFalse(doAuthorizeOwner(currentPrincipal));
+
+    // Set owner to current user
+    when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID), 
eq("CATALOG")))
+        .thenReturn(new OwnerInfo(USER_ID, "USER"));
     getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
     assertTrue(doAuthorizeOwner(currentPrincipal));
-    doReturn(new ArrayList<>())
-        .when(supportsRelationOperations)
-        .listEntitiesByRelation(
-            eq(SupportsRelationOperations.Type.OWNER_REL),
-            eq(catalogIdent),
-            eq(Entity.EntityType.CATALOG));
+
+    // Remove owner via change hook
+    when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID), 
eq("CATALOG")))
+        .thenReturn(null);
     jcasbinAuthorizer.handleMetadataOwnerChange(
         METALAKE, USER_ID, catalogIdent, Entity.EntityType.CATALOG);
     assertFalse(doAuthorizeOwner(currentPrincipal));
@@ -311,26 +363,18 @@ public class TestJcasbinAuthorizer {
                     .withAuditInfo(AuditInfo.EMPTY)
                     .build()));
 
-    // Mock owner relation returning a GroupEntity
-    List<GroupEntity> owners = ImmutableList.of(getGroupEntity());
-    doReturn(owners)
-        .when(supportsRelationOperations)
-        .listEntitiesByRelation(
-            eq(SupportsRelationOperations.Type.OWNER_REL),
-            eq(catalogIdent),
-            eq(Entity.EntityType.CATALOG));
+    // Mock owner_meta returning a GROUP-typed owner with GROUP_ID
+    OwnerInfo groupOwnerInfo = new OwnerInfo(GROUP_ID, "GROUP");
+    when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID), 
eq("CATALOG")))
+        .thenReturn(groupOwnerInfo);
     getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
 
     // The principal belongs to the owning group, so isOwner should return true
     assertTrue(doAuthorizeOwner(groupPrincipal));
 
     // Clear owner and verify it returns false
-    doReturn(new ArrayList<>())
-        .when(supportsRelationOperations)
-        .listEntitiesByRelation(
-            eq(SupportsRelationOperations.Type.OWNER_REL),
-            eq(catalogIdent),
-            eq(Entity.EntityType.CATALOG));
+    when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID), 
eq("CATALOG")))
+        .thenReturn(null);
     jcasbinAuthorizer.handleMetadataOwnerChange(
         METALAKE, GROUP_ID, catalogIdent, Entity.EntityType.CATALOG);
     assertFalse(doAuthorizeOwner(groupPrincipal));
@@ -342,13 +386,8 @@ public class TestJcasbinAuthorizer {
     principalUtilsMockedStatic
         .when(PrincipalUtils::getCurrentPrincipal)
         .thenReturn(nonMemberPrincipal);
-    // Re-populate the owner cache with the group owner
-    doReturn(ImmutableList.of(getGroupEntity()))
-        .when(supportsRelationOperations)
-        .listEntitiesByRelation(
-            eq(SupportsRelationOperations.Type.OWNER_REL),
-            eq(catalogIdent),
-            eq(Entity.EntityType.CATALOG));
+    when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID), 
eq("CATALOG")))
+        .thenReturn(groupOwnerInfo);
     getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
     assertFalse(doAuthorizeOwner(nonMemberPrincipal));
 
@@ -984,14 +1023,13 @@ public class TestJcasbinAuthorizer {
 
   @Test
   public void testOwnerCacheInvalidation() throws Exception {
-    // Get the ownerRel cache via reflection
-    Cache<Long, Optional<OwnerInfo>> ownerRel = 
getOwnerRelCache(jcasbinAuthorizer);
+    GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache = 
getOwnerRelCache(jcasbinAuthorizer);
 
     // Manually add an owner relation to the cache
-    ownerRel.put(CATALOG_ID, Optional.of(new OwnerInfo(USER_ID, 
Entity.EntityType.USER, USERNAME)));
+    ownerRelCache.put(CATALOG_ID, Optional.of(new OwnerInfo(USER_ID, "USER")));
 
     // Verify it's in the cache
-    assertNotNull(ownerRel.getIfPresent(CATALOG_ID));
+    assertTrue(ownerRelCache.getIfPresent(CATALOG_ID).isPresent());
 
     // Create a mock NameIdentifier for the metadata object
     NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE, 
"testCatalog");
@@ -1001,7 +1039,74 @@ public class TestJcasbinAuthorizer {
         METALAKE, USER_ID, catalogIdent, Entity.EntityType.CATALOG);
 
     // Verify it's removed from the cache
-    assertNull(ownerRel.getIfPresent(CATALOG_ID));
+    assertFalse(ownerRelCache.getIfPresent(CATALOG_ID).isPresent());
+  }
+
+  @Test
+  public void testOwnerChangeBestEffortWhenMetadataIdLookupFails() throws 
Exception {
+    GravitinoCache<String, Long> metadataIdCache = 
getMetadataIdCache(jcasbinAuthorizer);
+    GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache = 
getOwnerRelCache(jcasbinAuthorizer);
+    NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE, 
"testCatalog");
+    String cacheKey =
+        JcasbinAuthorizationLookups.buildCacheKey(
+            METALAKE, NameIdentifierUtil.toMetadataObject(catalogIdent, 
Entity.EntityType.CATALOG));
+
+    metadataIdCache.put(cacheKey, CATALOG_ID);
+    ownerRelCache.put(CATALOG_ID, Optional.of(new OwnerInfo(USER_ID, "USER")));
+    metadataIdConverterMockedStatic
+        .when(() -> MetadataIdConverter.getID(any(), eq(METALAKE)))
+        .thenThrow(new RuntimeException("lookup failed"));
+
+    try {
+      Assertions.assertDoesNotThrow(
+          () ->
+              jcasbinAuthorizer.handleMetadataOwnerChange(
+                  METALAKE, USER_ID, catalogIdent, Entity.EntityType.CATALOG));
+
+      assertFalse(metadataIdCache.getIfPresent(cacheKey).isPresent());
+      assertTrue(ownerRelCache.getIfPresent(CATALOG_ID).isPresent());
+    } finally {
+      metadataIdConverterMockedStatic
+          .when(() -> MetadataIdConverter.getID(any(), eq(METALAKE)))
+          .thenReturn(CATALOG_ID);
+    }
+  }
+
+  @Test
+  public void testCloseAwaitsRoleLoadExecutorTermination() throws Exception {
+    RecordingThreadPoolExecutor executor = new RecordingThreadPoolExecutor();
+
+    Field executorField = JcasbinAuthorizer.class.getDeclaredField("executor");
+    Field changePollerField = 
JcasbinAuthorizer.class.getDeclaredField("changePoller");
+    Field metadataIdCacheField = 
JcasbinAuthorizer.class.getDeclaredField("metadataIdCache");
+    Field ownerRelCacheField = 
JcasbinAuthorizer.class.getDeclaredField("ownerRelCache");
+
+    Executor originalExecutor =
+        (Executor) FieldUtils.readField(executorField, jcasbinAuthorizer, 
true);
+    Object originalPoller = FieldUtils.readField(changePollerField, 
jcasbinAuthorizer, true);
+    Object originalMetadataIdCache =
+        FieldUtils.readField(metadataIdCacheField, jcasbinAuthorizer, true);
+    Object originalOwnerRelCache =
+        FieldUtils.readField(ownerRelCacheField, jcasbinAuthorizer, true);
+
+    try {
+      FieldUtils.writeField(executorField, jcasbinAuthorizer, executor, true);
+      // Detach shared resources so close() only exercises the executor 
shutdown branch and does
+      // not leave the shared poller / caches in a closed state for subsequent 
tests.
+      FieldUtils.writeField(changePollerField, jcasbinAuthorizer, null, true);
+      FieldUtils.writeField(metadataIdCacheField, jcasbinAuthorizer, null, 
true);
+      FieldUtils.writeField(ownerRelCacheField, jcasbinAuthorizer, null, true);
+
+      jcasbinAuthorizer.close();
+
+      assertTrue(executor.shutdownCalled.get());
+      assertTrue(executor.awaitTerminationCalled.get());
+    } finally {
+      FieldUtils.writeField(executorField, jcasbinAuthorizer, 
originalExecutor, true);
+      FieldUtils.writeField(changePollerField, jcasbinAuthorizer, 
originalPoller, true);
+      FieldUtils.writeField(metadataIdCacheField, jcasbinAuthorizer, 
originalMetadataIdCache, true);
+      FieldUtils.writeField(ownerRelCacheField, jcasbinAuthorizer, 
originalOwnerRelCache, true);
+    }
   }
 
   @Test
@@ -1043,10 +1148,10 @@ public class TestJcasbinAuthorizer {
   public void testCacheInitialization() throws Exception {
     // Verify that caches are initialized
     Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
-    Cache<Long, Optional<OwnerInfo>> ownerRel = 
getOwnerRelCache(jcasbinAuthorizer);
+    GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache = 
getOwnerRelCache(jcasbinAuthorizer);
 
     assertNotNull(loadedRoles, "loadedRoles cache should be initialized");
-    assertNotNull(ownerRel, "ownerRel cache should be initialized");
+    assertNotNull(ownerRelCache, "ownerRelCache should be initialized");
   }
 
   /** Tests {@link JcasbinAuthorizer#hasMetadataPrivilegePermission} hierarchy 
walk */
@@ -1199,11 +1304,19 @@ public class TestJcasbinAuthorizer {
   }
 
   @SuppressWarnings("unchecked")
-  private static Cache<Long, Optional<OwnerInfo>> 
getOwnerRelCache(JcasbinAuthorizer authorizer)
+  private static GravitinoCache<Long, Optional<OwnerInfo>> getOwnerRelCache(
+      JcasbinAuthorizer authorizer) throws Exception {
+    Field field = JcasbinAuthorizer.class.getDeclaredField("ownerRelCache");
+    field.setAccessible(true);
+    return (GravitinoCache<Long, Optional<OwnerInfo>>) field.get(authorizer);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static GravitinoCache<String, Long> 
getMetadataIdCache(JcasbinAuthorizer authorizer)
       throws Exception {
-    Field field = JcasbinAuthorizer.class.getDeclaredField("ownerRel");
+    Field field = JcasbinAuthorizer.class.getDeclaredField("metadataIdCache");
     field.setAccessible(true);
-    return (Cache<Long, Optional<OwnerInfo>>) field.get(authorizer);
+    return (GravitinoCache<String, Long>) field.get(authorizer);
   }
 
   private static Enforcer getAllowEnforcer(JcasbinAuthorizer authorizer) 
throws Exception {
@@ -1217,4 +1330,25 @@ public class TestJcasbinAuthorizer {
     field.setAccessible(true);
     return (Enforcer) field.get(authorizer);
   }
+
+  private static class RecordingThreadPoolExecutor extends ThreadPoolExecutor {
+    private final AtomicBoolean shutdownCalled = new AtomicBoolean();
+    private final AtomicBoolean awaitTerminationCalled = new AtomicBoolean();
+
+    private RecordingThreadPoolExecutor() {
+      super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+    }
+
+    @Override
+    public void shutdown() {
+      shutdownCalled.set(true);
+      super.shutdown();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+      awaitTerminationCalled.set(true);
+      return super.awaitTermination(timeout, unit);
+    }
+  }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinChangePoller.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinChangePoller.java
new file mode 100644
index 0000000000..867a862603
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinChangePoller.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.apache.gravitino.storage.relational.po.cache.EntityChangeRecord;
+import org.apache.gravitino.storage.relational.po.cache.OperateType;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+/** Tests for {@link JcasbinChangePoller} static helpers. */
+public class TestJcasbinChangePoller {
+
+  @Test
+  void testRejectsNonPositivePollInterval() {
+    RecordingCache<String, Long> metadataIdCache = new RecordingCache<>();
+    RecordingCache<Long, Optional<OwnerInfo>> ownerRelCache = new 
RecordingCache<>();
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new JcasbinChangePoller(metadataIdCache, ownerRelCache, 0));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> new JcasbinChangePoller(metadataIdCache, ownerRelCache, -1));
+  }
+
+  @Test
+  void testChangeLogFullNameStripsLeadingMetalakeForChildTypes() {
+    MetadataObject catalog =
+        JcasbinChangePoller.metadataObjectFromChangeLog(
+            "ml1", "ml1.cat1", MetadataObject.Type.CATALOG);
+    Assertions.assertEquals(
+        key("ml1", "cat1", ""), 
JcasbinAuthorizationLookups.buildCacheKey("ml1", catalog));
+
+    MetadataObject schema =
+        JcasbinChangePoller.metadataObjectFromChangeLog(
+            "ml1", "ml1.cat1.sch1", MetadataObject.Type.SCHEMA);
+    Assertions.assertEquals(
+        key("ml1", "cat1", "sch1", ""), 
JcasbinAuthorizationLookups.buildCacheKey("ml1", schema));
+
+    MetadataObject table =
+        JcasbinChangePoller.metadataObjectFromChangeLog(
+            "ml1", "ml1.cat1.sch1.tbl1", MetadataObject.Type.TABLE);
+    Assertions.assertEquals(
+        key("ml1", "cat1", "sch1", "tbl1", "TABLE"),
+        JcasbinAuthorizationLookups.buildCacheKey("ml1", table));
+  }
+
+  @Test
+  void testChangeLogFullNameForMetalakeKeepsItself() {
+    MetadataObject metalake =
+        JcasbinChangePoller.metadataObjectFromChangeLog("ml1", "ml1", 
MetadataObject.Type.METALAKE);
+    Assertions.assertEquals(
+        key("ml1", ""), JcasbinAuthorizationLookups.buildCacheKey("ml1", 
metalake));
+  }
+
+  @Test
+  void testPollEntityChangesCoalescesContainerPrefixes() {
+    RecordingCache<String, Long> metadataIdCache = new RecordingCache<>();
+    RecordingCache<Long, Optional<OwnerInfo>> ownerRelCache = new 
RecordingCache<>();
+    EntityChangeLogMapper entityChangeLogMapper = 
mock(EntityChangeLogMapper.class);
+    OwnerMetaMapper ownerMetaMapper = mock(OwnerMetaMapper.class);
+
+    
when(ownerMetaMapper.selectChangedOwners(0L)).thenReturn(Collections.emptyList());
+    when(entityChangeLogMapper.selectEntityChanges(0L, 500))
+        .thenReturn(
+            List.of(
+                change(1L, MetadataObject.Type.CATALOG, "ml1.cat1"),
+                change(2L, MetadataObject.Type.SCHEMA, "ml1.cat1.sch1"),
+                change(3L, MetadataObject.Type.TABLE, "ml1.cat1.sch1.tbl1"),
+                change(4L, MetadataObject.Type.TABLE, "ml1.cat2.sch1.tbl1")));
+
+    try (MockedStatic<SessionUtils> sessionUtils = 
mockStatic(SessionUtils.class)) {
+      sessionUtils
+          .when(() -> SessionUtils.getWithoutCommit(any(), any()))
+          .thenAnswer(
+              invocation -> {
+                Class<?> mapperClass = invocation.getArgument(0);
+                Function<Object, Object> func = invocation.getArgument(1);
+                if (mapperClass == OwnerMetaMapper.class) {
+                  return func.apply(ownerMetaMapper);
+                }
+                if (mapperClass == EntityChangeLogMapper.class) {
+                  return func.apply(entityChangeLogMapper);
+                }
+                return null;
+              });
+
+      JcasbinChangePoller poller = new JcasbinChangePoller(metadataIdCache, 
ownerRelCache, 1);
+      poller.pollChanges();
+    }
+
+    Assertions.assertEquals(List.of(key("ml1", "cat1", "")), 
metadataIdCache.invalidatedPrefixes);
+    Assertions.assertEquals(
+        List.of(key("ml1", "cat2", "sch1", "tbl1", "TABLE")), 
metadataIdCache.invalidatedKeys);
+  }
+
+  @Test
+  void testPollCursorAdvancementIsSynchronized() throws NoSuchMethodException {
+    Method pollOwnerChanges = 
JcasbinChangePoller.class.getDeclaredMethod("pollOwnerChanges");
+    Method pollEntityChanges = 
JcasbinChangePoller.class.getDeclaredMethod("pollEntityChanges");
+
+    
Assertions.assertTrue(Modifier.isSynchronized(pollOwnerChanges.getModifiers()));
+    
Assertions.assertTrue(Modifier.isSynchronized(pollEntityChanges.getModifiers()));
+  }
+
+  private static String key(String... parts) {
+    return String.join(JcasbinAuthorizationLookups.KEY_SEP, parts);
+  }
+
+  private static EntityChangeRecord change(long id, MetadataObject.Type type, 
String fullName) {
+    return new EntityChangeRecord(id, "ml1", type.name(), fullName, 
OperateType.ALTER, 0L);
+  }
+
+  private static class RecordingCache<K, V> implements GravitinoCache<K, V> {
+    private final List<K> invalidatedKeys = new ArrayList<>();
+    private final List<String> invalidatedPrefixes = new ArrayList<>();
+
+    @Override
+    public Optional<V> getIfPresent(K key) {
+      return Optional.empty();
+    }
+
+    @Override
+    public void put(K key, V value) {}
+
+    @Override
+    public void invalidate(K key) {
+      invalidatedKeys.add(key);
+    }
+
+    @Override
+    public void invalidateAll() {}
+
+    @Override
+    public void invalidateByPrefix(String prefix) {
+      invalidatedPrefixes.add(prefix);
+    }
+
+    @Override
+    public long size() {
+      return 0;
+    }
+
+    @Override
+    public void close() {}
+  }
+}

Reply via email to