This is an automated email from the ASF dual-hosted git repository.
jerryshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 584fcd3896 [#10772] feat(authz): Eventual-consistency invalidation for
JcasbinAuthorizer caches (#11117)
584fcd3896 is described below
commit 584fcd3896354f75dd6d3a8add3961bdf9625eb2
Author: Qi Yu <[email protected]>
AuthorDate: Wed May 20 14:16:38 2026 +0800
[#10772] feat(authz): Eventual-consistency invalidation for
JcasbinAuthorizer caches (#11117)
### What changes were proposed in this pull request?
Introduces the HA invalidation infrastructure that the JcasbinAuthorizer
cache refactor (#10772) needs as a prerequisite. The existing
role-loading path stays on its current TTL-only behavior; the
version-validated user / group / role caches and the JcasbinRoleLoader
ride on top of this PR in a follow-up.
**New classes:**
- JcasbinAuthorizationLookups: two-tier metadata-id and owner lookup
facade (per-request dedup via AuthorizationRequestContext, shared
Caffeine-backed GravitinoCache fallback, DB on miss). Owners are now
fetched directly from owner_meta via OwnerMetaMapper instead of the
OWNER_REL relation query, so the cache key space matches the
entity-change-log key space.
- JcasbinChangePoller: scheduled thread that drains entity_change_log
and owner_meta change rows since a high-water cursor and invalidates the
affected cache keys. Documents the id-cursor in-flight-commit trade-off
and the writer-side pre-mutation-name contract.
**Modified:**
- JcasbinAuthorizer: wires the two GravitinoCaches + lookups + poller in
initialize/close; routes isOwner, OWNER-privilege authorization, and
handleMetadataOwnerChange through the lookups; drops the in-class
OwnerInfo (replaced by
org.apache.gravitino.storage.relational.po.auth.OwnerInfo) and collapses
loadOwnerPolicy + checkOwnership into ownerMatchesUserOrGroups;
implements handleEntityNameIdMappingChange.
- GravitinoAuthorizer: adds the handleEntityNameIdMappingChange default
method.
- Configs: adds metadataIdCacheSize and changePollIntervalSecs.
### Why are the changes needed?
This is a prerequisite split of #10996 (tracking issue #10772) to keep
that follow-up PR scoped to the role-loading rewrite alone.
Fix: #10772
### Does this PR introduce _any_ user-facing change?
No. New configs ship with safe defaults and existing behavior is
preserved for the role-loading path.
### How was this patch tested?
- TestJcasbinAuthorizer: existing cases (owner mocks adjusted to the new
OwnerMetaMapper.selectOwnerByMetadataObjectIdAndType path).
- TestJcasbinAuthorizationLookups: buildCacheKey / isContainerType /
cascade-prefix hierarchy.
- TestJcasbinChangePoller: metadataObjectFromChangeLog builds cache keys
matching the writer-side full-name format.
```
./gradlew :server-common:test --tests
org.apache.gravitino.server.authorization.jcasbin.TestJcasbinChangePoller
--tests org.apache.gravitino.server.authorization.jcasbin.TestJcasbinAuthorizer
./gradlew :core:test --tests
org.apache.gravitino.authorization.TestAuthorizationUtils
```
---------
Co-authored-by: Claude Opus 4.7 <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../main/java/org/apache/gravitino/Configs.java | 19 ++
.../authorization/AuthorizationUtils.java | 16 ++
.../authorization/GravitinoAuthorizer.java | 21 +-
.../gravitino/authorization/OwnerManager.java | 95 +++---
.../gravitino/cache/CaffeineGravitinoCache.java | 104 ++++++-
.../org/apache/gravitino/cache/GravitinoCache.java | 42 ++-
.../gravitino/cache/NoOpsGravitinoCache.java | 7 +
.../authorization/TestAuthorizationUtils.java | 28 ++
.../gravitino/authorization/TestOwnerManager.java | 41 +++
.../apache/gravitino/cache/TestGravitinoCache.java | 165 +++++++++++
docs/security/access-control.md | 8 +
.../jcasbin/JcasbinAuthorizationLookups.java | 149 ++++++++++
.../authorization/jcasbin/JcasbinAuthorizer.java | 214 +++++++-------
.../authorization/jcasbin/JcasbinChangePoller.java | 320 +++++++++++++++++++++
.../jcasbin/TestJcasbinAuthorizationLookups.java | 207 +++++++++++++
.../jcasbin/TestJcasbinAuthorizer.java | 230 +++++++++++----
.../jcasbin/TestJcasbinChangePoller.java | 182 ++++++++++++
17 files changed, 1655 insertions(+), 193 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index 2f09aa54c8..1c50d74402 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -330,6 +330,25 @@ public class Configs {
.longConf()
.createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_OWNER_CACHE_SIZE);
+ public static final long
DEFAULT_GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE = 100000L;
+
+ public static final ConfigEntry<Long>
GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE =
+ new ConfigBuilder("gravitino.authorization.jcasbin.metadataIdCacheSize")
+ .doc("The maximum size of the metadata-id cache for authorization")
+ .version(ConfigConstants.VERSION_1_3_0)
+ .longConf()
+
.createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE);
+
+ public static final long
DEFAULT_GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS = 3L;
+
+ public static final ConfigEntry<Long>
GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS =
+ new
ConfigBuilder("gravitino.authorization.jcasbin.changePollIntervalSecs")
+ .doc("The interval in seconds for polling entity and owner changes")
+ .version(ConfigConstants.VERSION_1_3_0)
+ .longConf()
+ .checkValue(value -> value > 0,
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+
.createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS);
+
public static final ConfigEntry<List<String>> SERVICE_ADMINS =
new ConfigBuilder("gravitino.authorization.serviceAdmins")
.doc("The admins of Gravitino service")
diff --git
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 16642a5855..2f5518ad84 100644
---
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -366,6 +366,7 @@ public class AuthorizationUtils {
// If we enable authorization, we should rename the privileges about the
entity in the
// authorization plugin.
if (GravitinoEnv.getInstance().accessControlDispatcher() != null) {
+ notifyEntityNameIdMappingChange(ident, type);
MetadataObject oldMetadataObject =
NameIdentifierUtil.toMetadataObject(ident, type);
MetadataObject newMetadataObject =
NameIdentifierUtil.toMetadataObject(NameIdentifier.of(ident.namespace(),
newName), type);
@@ -386,6 +387,21 @@ public class AuthorizationUtils {
}
}
+ private static void notifyEntityNameIdMappingChange(
+ NameIdentifier ident, Entity.EntityType type) {
+ GravitinoAuthorizer gravitinoAuthorizer =
GravitinoEnv.getInstance().gravitinoAuthorizer();
+ if (gravitinoAuthorizer == null) {
+ return;
+ }
+ String metalake =
+ type == Entity.EntityType.METALAKE ? ident.name() :
ident.namespace().level(0);
+ try {
+ gravitinoAuthorizer.handleEntityNameIdMappingChange(metalake, ident,
type);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to notify entity name-id mapping change for {}", ident,
e);
+ }
+ }
+
public static Role filterSecurableObjects(
RoleEntity role, String metalakeName, String catalogName) {
List<SecurableObject> securableObjects = role.securableObjects();
diff --git
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
index 6ab74382ad..12529f74c6 100644
---
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
+++
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
@@ -19,6 +19,7 @@ package org.apache.gravitino.authorization;
import java.io.Closeable;
import java.security.Principal;
+import javax.annotation.Nullable;
import org.apache.gravitino.Entity;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.MetadataObject;
@@ -160,10 +161,26 @@ public interface GravitinoAuthorizer extends Closeable {
* changes.
*
* @param metalake metalake;
- * @param oldOwnerId The old owner id;
+ * @param oldOwnerId The old owner id; null when setting the first owner.
* @param nameIdentifier The metadata name identifier;
* @param type entity type
*/
void handleMetadataOwnerChange(
- String metalake, Long oldOwnerId, NameIdentifier nameIdentifier,
Entity.EntityType type);
+ String metalake,
+ @Nullable Long oldOwnerId,
+ NameIdentifier nameIdentifier,
+ Entity.EntityType type);
+
+ /**
+ * Called when an entity name-to-id mapping may have changed because of a
rename or drop.
+ * Implementations evict the cache key for the given entity and all of its
descendants.
+ *
+ * @param metalake the metalake name
+ * @param nameIdentifier the entity name identifier
+ * @param type the entity type
+ */
+ default void handleEntityNameIdMappingChange(
+ String metalake, NameIdentifier nameIdentifier, Entity.EntityType type) {
+ // default no-op for backward compatibility
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
index 70e10f1488..9e17ef64ec 100644
--- a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
+++ b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.authorization;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import javax.annotation.Nullable;
import lombok.Getter;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
@@ -118,7 +119,7 @@ public class OwnerManager implements OwnerDispatcher {
metadataObject,
authorizationPlugin ->
authorizationPlugin.onOwnerSet(metadataObject,
originOwner.orElse(null), newOwner));
- originOwner.ifPresent(owner -> notifyOwnerChange(owner, metalake,
metadataObject));
+ notifyOwnerChange(originOwner.orElse(null), metalake, metadataObject);
} catch (NoSuchEntityException nse) {
LOG.warn(
"Metadata object {} or owner {} is not found",
metadataObject.fullName(), ownerName, nse);
@@ -134,41 +135,65 @@ public class OwnerManager implements OwnerDispatcher {
}
}
- private void notifyOwnerChange(Owner oldOwner, String metalake,
MetadataObject metadataObject) {
+ private void notifyOwnerChange(
+ @Nullable Owner oldOwner, String metalake, MetadataObject
metadataObject) {
GravitinoAuthorizer gravitinoAuthorizer =
GravitinoEnv.getInstance().gravitinoAuthorizer();
- if (gravitinoAuthorizer != null) {
- try {
- Long oldOwnerId;
- if (oldOwner.type() == Owner.Type.USER) {
- UserEntity userEntity =
- GravitinoEnv.getInstance()
- .entityStore()
- .get(
- NameIdentifierUtil.ofUser(metalake, oldOwner.name()),
- Entity.EntityType.USER,
- UserEntity.class);
- oldOwnerId = userEntity.id();
- } else if (oldOwner.type() == Owner.Type.GROUP) {
- GroupEntity groupEntity =
- GravitinoEnv.getInstance()
- .entityStore()
- .get(
- NameIdentifierUtil.ofGroup(metalake, oldOwner.name()),
- Entity.EntityType.GROUP,
- GroupEntity.class);
- oldOwnerId = groupEntity.id();
- } else {
- LOG.warn("Unsupported owner type: {}", oldOwner.type());
- return;
- }
- gravitinoAuthorizer.handleMetadataOwnerChange(
- metalake,
- oldOwnerId,
- MetadataObjectUtil.toEntityIdent(metalake, metadataObject),
- Entity.EntityType.valueOf(metadataObject.type().name()));
- } catch (IOException e) {
- LOG.warn(e.getMessage(), e);
- }
+ if (gravitinoAuthorizer == null) {
+ return;
+ }
+
+ Long oldOwnerId;
+ try {
+ oldOwnerId = oldOwner == null ? null : getOwnerId(metalake, oldOwner);
+ } catch (IOException e) {
+ LOG.warn(
+ "Failed to resolve previous owner id for {} (metalake={},
oldOwner={}); "
+ + "cache invalidation for this change may be skipped on this
node",
+ metadataObject.fullName(),
+ metalake,
+ oldOwner,
+ e);
+ return;
+ }
+
+ try {
+ gravitinoAuthorizer.handleMetadataOwnerChange(
+ metalake,
+ oldOwnerId,
+ MetadataObjectUtil.toEntityIdent(metalake, metadataObject),
+ Entity.EntityType.valueOf(metadataObject.type().name()));
+ } catch (RuntimeException e) {
+ // Best-effort hook: a failing authorizer must not fail the owner-change
operation itself.
+ LOG.warn(
+ "Authorizer hook failed for owner change on {} (metalake={},
oldOwnerId={})",
+ metadataObject.fullName(),
+ metalake,
+ oldOwnerId,
+ e);
+ }
+ }
+
+ private Long getOwnerId(String metalake, Owner owner) throws IOException {
+ if (owner.type() == Owner.Type.USER) {
+ UserEntity userEntity =
+ GravitinoEnv.getInstance()
+ .entityStore()
+ .get(
+ NameIdentifierUtil.ofUser(metalake, owner.name()),
+ Entity.EntityType.USER,
+ UserEntity.class);
+ return userEntity.id();
+ } else if (owner.type() == Owner.Type.GROUP) {
+ GroupEntity groupEntity =
+ GravitinoEnv.getInstance()
+ .entityStore()
+ .get(
+ NameIdentifierUtil.ofGroup(metalake, owner.name()),
+ Entity.EntityType.GROUP,
+ GroupEntity.class);
+ return groupEntity.id();
+ } else {
+ throw new IllegalArgumentException("Unsupported owner type: " +
owner.type());
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java
b/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java
index f1ed54df75..2154dea3dc 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineGravitinoCache.java
@@ -22,13 +22,23 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
/**
* A Caffeine-backed implementation of {@link GravitinoCache}. Supports
configurable TTL and maximum
* size.
*
+ * <p>A fair {@link ReentrantReadWriteLock} serialises invalidations against
reads/puts. Reads,
+ * loader-backed gets, and puts share the read lock; {@code invalidate*}
operations take the write
+ * lock. See the field-level Javadoc on {@link #lock} for the races this
guards against and the
+ * cost/benefit reasoning. Callers that need a multi-step invalidation to
appear atomic wrap their
+ * sequence in {@link #runInvalidationBatch(Runnable)}.
+ *
* @param <K> the key type
* @param <V> the value type
*/
@@ -36,6 +46,37 @@ public class CaffeineGravitinoCache<K, V> implements
GravitinoCache<K, V> {
private final Cache<K, V> cache;
+ /**
+ * Serialises invalidations against reads/puts so a batched invalidation
never appears
+ * half-applied to concurrent readers. Two related races motivate this:
+ *
+ * <ol>
+ * <li><b>Single-node read/invalidate race (addressed here).</b> When the
change poller drains a
+ * batch of stale keys and calls {@link #invalidate} per key, a reader
can interleave
+ * between two invalidations and observe a half-applied batch — some
entries already
+ * evicted, others still hot. The exclusive write lock makes each
batch atomic from a
+ * reader's point of view: readers see either the pre-batch state or
the post-batch state,
+ * never a mix. Callers that need multi-step atomicity wrap their
sequence in {@link
+ * #runInvalidationBatch(Runnable)}.
+ * <li><b>Multi-node staleness window (NOT addressed here, and no lock
can).</b> After a commit
+ * on node A, node B keeps returning stale data until B's poller runs
(~poll interval, on
+ * the order of seconds). The race in (1) is a ~ms slice of that
larger window and
+ * self-heals once the batch completes — the affected keys are gone
from the cache, so the
+ * next read pulls fresh data. Callers that need read-after-write
consistency must bypass
+ * the cache; nothing on node B can make node A's commit visible
sooner.
+ * </ol>
+ *
+ * <p>Cost: the poller runs every few seconds with small batches, so
contention is rare; a fair RW
+ * lock under no contention is on the order of nanoseconds. Dropping the
lock would be defensible
+ * — race (2) dominates the staleness budget — but keeping it makes the
cache's invalidation
+ * semantics easy to reason about and test, at a cost we won't measure on
the hot path.
+ *
+ * <p>Fair mode prevents writer starvation: a parked {@code invalidate*}
call queues new readers
+ * behind it instead of letting them slip in ahead. Reads sit on the
authorization hot path and
+ * would otherwise starve the rarer invalidations.
+ */
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
/**
* Creates a new CaffeineGravitinoCache with the given TTL and maximum size.
*
@@ -57,35 +98,57 @@ public class CaffeineGravitinoCache<K, V> implements
GravitinoCache<K, V> {
@Override
public Optional<V> getIfPresent(K key) {
- V value = cache.getIfPresent(key);
- return Optional.ofNullable(value);
+ return withReadLock(() -> Optional.ofNullable(cache.getIfPresent(key)));
+ }
+
+ @Override
+ public V get(K key, Function<K, V> loader) {
+ return withReadLock(
+ () ->
+ cache.get(
+ key,
+ k -> Objects.requireNonNull(loader.apply(k), "Cache loader
must not return null")));
}
@Override
public void put(K key, V value) {
- cache.put(key, value);
+ // Puts run under the shared read lock so that they may proceed
concurrently with reads and
+ // with other puts; only invalidations need exclusive access.
+ withReadLock(() -> cache.put(key, value));
}
@Override
public void invalidate(K key) {
- cache.invalidate(key);
+ withWriteLock(() -> cache.invalidate(key));
}
@Override
public void invalidateAll() {
- cache.invalidateAll();
+ withWriteLock(cache::invalidateAll);
}
@Override
public void invalidateByPrefix(String prefix) {
// Prefix invalidation scans all keys. It is intended for infrequent
structural invalidations
// such as dropping or renaming an entity hierarchy, not for per-request
hot paths.
- cache.asMap().keySet().removeIf(k -> k instanceof String && ((String)
k).startsWith(prefix));
+ withWriteLock(
+ () ->
+ cache
+ .asMap()
+ .keySet()
+ .removeIf(k -> k instanceof String && ((String)
k).startsWith(prefix)));
+ }
+
+ @Override
+ public void runInvalidationBatch(Runnable batch) {
+ // The write lock is reentrant, so individual invalidate* calls inside the
batch re-acquire it
+ // without deadlocking.
+ withWriteLock(batch);
}
@Override
public long size() {
- return cache.estimatedSize();
+ return withReadLock(cache::estimatedSize);
}
@VisibleForTesting
@@ -98,4 +161,31 @@ public class CaffeineGravitinoCache<K, V> implements
GravitinoCache<K, V> {
cache.invalidateAll();
cache.cleanUp();
}
+
+ private <T> T withReadLock(Supplier<T> action) {
+ lock.readLock().lock();
+ try {
+ return action.get();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private void withReadLock(Runnable action) {
+ lock.readLock().lock();
+ try {
+ action.run();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private void withWriteLock(Runnable action) {
+ lock.writeLock().lock();
+ try {
+ action.run();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java
b/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java
index 3771608e3c..7eaf839b50 100644
--- a/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/GravitinoCache.java
@@ -19,7 +19,9 @@
package org.apache.gravitino.cache;
import java.io.Closeable;
+import java.util.Objects;
import java.util.Optional;
+import java.util.function.Function;
/**
* A general-purpose cache interface used by the authorization subsystem.
Implementations include a
@@ -38,6 +40,26 @@ public interface GravitinoCache<K, V> extends Closeable {
*/
Optional<V> getIfPresent(K key);
+ /**
+ * Returns the value associated with the key, loading it if necessary.
Implementations with real
+ * storage should make the load atomic for the same key, and should
serialise concurrent reads
+ * against any {@link #invalidate(Object) invalidate} call so a reader
cannot observe a partially
+ * invalidated cache.
+ *
+ * @param key the cache key
+ * @param loader the loader invoked when the key is absent
+ * @return the cached or loaded value
+ */
+ default V get(K key, Function<K, V> loader) {
+ Optional<V> value = getIfPresent(key);
+ if (value.isPresent()) {
+ return value.get();
+ }
+ V loaded = Objects.requireNonNull(loader.apply(key), "Cache loader must
not return null");
+ put(key, loaded);
+ return loaded;
+ }
+
/**
* Associates the value with the key in the cache.
*
@@ -58,13 +80,29 @@ public interface GravitinoCache<K, V> extends Closeable {
/**
* Evicts all entries whose key is a String and starts with the given
prefix. Only meaningful when
- * K = String. Used by metadataIdCache for hierarchical cascade
invalidation: dropping a catalog
- * evicts the catalog entry plus all schema/table/fileset/... entries
beneath it.
+ * K = String. Used by metadataIdCache for path-based invalidation: dropping
a catalog evicts the
+ * catalog entry plus all schema/table/fileset/... entries under that
catalog name path.
*
* @param prefix the prefix to match against key strings
*/
void invalidateByPrefix(String prefix);
+ /**
+ * Runs {@code batch} so that all {@link #invalidate}, {@link
#invalidateAll}, and {@link
+ * #invalidateByPrefix} calls it makes appear atomic to concurrent readers —
i.e. readers see
+ * either the state from before the batch or the state after, never an
intermediate half-applied
+ * state.
+ *
+ * <p>Default implementation: just runs the runnable, suitable for caches
without real storage
+ * (e.g. {@link NoOpsGravitinoCache}). Storage-backed implementations should
hold an exclusive
+ * lock for the duration of the batch.
+ *
+ * @param batch the invalidation sequence to run atomically
+ */
+ default void runInvalidationBatch(Runnable batch) {
+ batch.run();
+ }
+
/**
* Returns the approximate number of entries in the cache.
*
diff --git
a/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java
b/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java
index 8c41eabeef..8d7bc53edd 100644
--- a/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/NoOpsGravitinoCache.java
@@ -18,7 +18,9 @@
*/
package org.apache.gravitino.cache;
+import java.util.Objects;
import java.util.Optional;
+import java.util.function.Function;
/**
* A no-op implementation of {@link GravitinoCache} that never caches
anything. Useful for testing
@@ -34,6 +36,11 @@ public class NoOpsGravitinoCache<K, V> implements
GravitinoCache<K, V> {
return Optional.empty();
}
+ @Override
+ public V get(K key, Function<K, V> loader) {
+ return Objects.requireNonNull(loader.apply(key), "Cache loader must not
return null");
+ }
+
@Override
public void put(K key, V value) {
// no-op
diff --git
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
index 3071e23f9a..e27b68bb91 100644
---
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
+++
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationUtils.java
@@ -31,8 +31,10 @@ import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.catalog.CatalogDispatcher;
+import org.apache.gravitino.catalog.CatalogManager;
import org.apache.gravitino.catalog.SchemaDispatcher;
import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
import org.apache.gravitino.exceptions.IllegalNamespaceException;
import org.apache.gravitino.meta.AuditInfo;
@@ -40,6 +42,7 @@ import org.apache.gravitino.meta.RoleEntity;
import org.apache.gravitino.rel.Table;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
class TestAuthorizationUtils {
@@ -289,4 +292,29 @@ class TestAuthorizationUtils {
Assertions.assertEquals(1, locations.size());
Assertions.assertEquals("schemaLocation", locations.get(0));
}
+
+ @Test
+ void testRenamePrivilegesNotifiesOldEntityNameIdMappingChange() {
+ GravitinoAuthorizer authorizer = Mockito.mock(GravitinoAuthorizer.class);
+ AccessControlDispatcher accessControlDispatcher =
Mockito.mock(AccessControlDispatcher.class);
+ CatalogManager catalogManager = Mockito.mock(CatalogManager.class);
+ BaseCatalog<?> baseCatalog = Mockito.mock(BaseCatalog.class);
+
Mockito.when(catalogManager.loadCatalog(Mockito.any())).thenReturn(baseCatalog);
+
+ GravitinoEnv envMock = Mockito.mock(GravitinoEnv.class);
+ Mockito.when(envMock.gravitinoAuthorizer()).thenReturn(authorizer);
+
Mockito.when(envMock.accessControlDispatcher()).thenReturn(accessControlDispatcher);
+ Mockito.when(envMock.catalogManager()).thenReturn(catalogManager);
+
+ try (MockedStatic<GravitinoEnv> envStatic =
Mockito.mockStatic(GravitinoEnv.class)) {
+ envStatic.when(GravitinoEnv::getInstance).thenReturn(envMock);
+
+ NameIdentifier ident = NameIdentifier.of("metalake", "catalog",
"schema", "table");
+ AuthorizationUtils.authorizationPluginRenamePrivileges(
+ ident, Entity.EntityType.TABLE, "new_table");
+
+ Mockito.verify(authorizer)
+ .handleEntityNameIdMappingChange("metalake", ident,
Entity.EntityType.TABLE);
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
index 56dd78f797..5226e5d13d 100644
---
a/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
+++
b/core/src/test/java/org/apache/gravitino/authorization/TestOwnerManager.java
@@ -43,14 +43,17 @@ import java.util.Collections;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.EntityStoreFactory;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
import org.apache.gravitino.catalog.CatalogManager;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
@@ -59,6 +62,7 @@ import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.SchemaVersion;
import org.apache.gravitino.meta.UserEntity;
@@ -243,4 +247,41 @@ public class TestOwnerManager {
ownerManager.setOwner(
METALAKE, metalakeObject, "non-existent-group",
Owner.Type.GROUP));
}
+
+ @Test
+ @Order(4)
+ public void testInitialOwnerSetNotifiesAuthorizer() throws
IllegalAccessException, IOException {
+ String catalogName = "catalog_initial_owner_notify";
+ AuditInfo audit =
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+ CatalogEntity catalog =
+ CatalogEntity.builder()
+ .withId(idGenerator.nextId())
+ .withName(catalogName)
+ .withNamespace(Namespace.of(METALAKE))
+ .withType(Catalog.Type.RELATIONAL)
+ .withProvider("test")
+ .withAuditInfo(audit)
+ .build();
+ entityStore.put(catalog, false);
+
+ GravitinoAuthorizer authorizer = Mockito.mock(GravitinoAuthorizer.class);
+ GravitinoAuthorizer originalAuthorizer =
GravitinoEnv.getInstance().gravitinoAuthorizer();
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "gravitinoAuthorizer",
authorizer, true);
+ try {
+ MetadataObject catalogObject =
+ MetadataObjects.of(Lists.newArrayList(catalogName),
MetadataObject.Type.CATALOG);
+
+ ownerManager.setOwner(METALAKE, catalogObject, USER, Owner.Type.USER);
+
+ Mockito.verify(authorizer)
+ .handleMetadataOwnerChange(
+ Mockito.eq(METALAKE),
+ Mockito.isNull(),
+ Mockito.eq(NameIdentifier.of(METALAKE, catalogName)),
+ Mockito.eq(Entity.EntityType.CATALOG));
+ } finally {
+ FieldUtils.writeField(
+ GravitinoEnv.getInstance(), "gravitinoAuthorizer",
originalAuthorizer, true);
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java
b/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java
index 132e142002..8cbf4e7232 100644
--- a/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java
+++ b/core/src/test/java/org/apache/gravitino/cache/TestGravitinoCache.java
@@ -19,7 +19,13 @@
package org.apache.gravitino.cache;
import com.github.benmanes.caffeine.cache.Ticker;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.awaitility.Awaitility;
@@ -53,6 +59,148 @@ public class TestGravitinoCache {
}
}
+ @Test
+ void testCaffeineGetLoadsSameKeyAtomically() throws Exception {
+ CaffeineGravitinoCache<String, Long> cache = new
CaffeineGravitinoCache<>(60_000L, 1000L);
+ ExecutorService executorService = Executors.newFixedThreadPool(8);
+ try {
+ AtomicLong loadCount = new AtomicLong();
+ CountDownLatch ready = new CountDownLatch(8);
+ CountDownLatch start = new CountDownLatch(1);
+ List<Future<Long>> futures = new ArrayList<>();
+
+ for (int i = 0; i < 8; i++) {
+ futures.add(
+ executorService.submit(
+ () -> {
+ ready.countDown();
+ start.await();
+ return cache.get(
+ "shared",
+ key -> {
+ loadCount.incrementAndGet();
+ return 100L;
+ });
+ }));
+ }
+
+ Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS));
+ start.countDown();
+ for (Future<Long> future : futures) {
+ Assertions.assertEquals(100L, future.get(5, TimeUnit.SECONDS));
+ }
+
+ Assertions.assertEquals(1L, loadCount.get());
+ Assertions.assertEquals(1L, cache.size());
+ } finally {
+ executorService.shutdownNow();
+ cache.close();
+ }
+ }
+
+ @Test
+ void testCaffeineInvalidateWaitsForInFlightReader() throws Exception {
+ CaffeineGravitinoCache<String, Long> cache = new
CaffeineGravitinoCache<>(60_000L, 1000L);
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ try {
+ CountDownLatch readerHoldsLock = new CountDownLatch(1);
+ CountDownLatch readerMayProceed = new CountDownLatch(1);
+
+ Future<Long> reader =
+ executorService.submit(
+ () ->
+ cache.get(
+ "k",
+ key -> {
+ readerHoldsLock.countDown();
+ try {
+ Assertions.assertTrue(readerMayProceed.await(5,
TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ return 42L;
+ }));
+
+ Assertions.assertTrue(readerHoldsLock.await(5, TimeUnit.SECONDS));
+ Future<?> invalidator = executorService.submit(() ->
cache.invalidate("k"));
+
+ // Invalidator should be parked on the write lock until the reader
releases its read lock.
+ Thread.sleep(150);
+ Assertions.assertFalse(
+ invalidator.isDone(), "invalidate must not proceed while a reader
holds the read lock");
+
+ readerMayProceed.countDown();
+ Assertions.assertEquals(42L, reader.get(5, TimeUnit.SECONDS));
+ invalidator.get(5, TimeUnit.SECONDS);
+
+ // Reader installed the value, invalidator then removed it.
+ Assertions.assertFalse(cache.getIfPresent("k").isPresent());
+ } finally {
+ executorService.shutdownNow();
+ cache.close();
+ }
+ }
+
+ @Test
+ void testCaffeineRunInvalidationBatchIsAtomicForReaders() throws Exception {
+ CaffeineGravitinoCache<String, Long> cache = new
CaffeineGravitinoCache<>(60_000L, 1000L);
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ try {
+ cache.put("k1", 1L);
+ cache.put("k2", 2L);
+ cache.put("k3", 3L);
+
+ CountDownLatch batchEntered = new CountDownLatch(1);
+ CountDownLatch batchMayProceed = new CountDownLatch(1);
+
+ Future<?> batcher =
+ executorService.submit(
+ () ->
+ cache.runInvalidationBatch(
+ () -> {
+ batchEntered.countDown();
+ try {
+ Assertions.assertTrue(batchMayProceed.await(5,
TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ cache.invalidate("k1");
+ cache.invalidate("k2");
+ cache.invalidate("k3");
+ }));
+
+ Assertions.assertTrue(batchEntered.await(5, TimeUnit.SECONDS));
+
+ // Reader started while the batcher holds the write lock must block
until the batch ends.
+ Future<Optional<Long>> reader = executorService.submit(() ->
cache.getIfPresent("k2"));
+ Thread.sleep(150);
+ Assertions.assertFalse(reader.isDone(), "reader must wait for the batch
to release the lock");
+
+ batchMayProceed.countDown();
+ batcher.get(5, TimeUnit.SECONDS);
+
+ // Reader observes the post-batch state, never a partially-invalidated
cache.
+ Assertions.assertFalse(reader.get(5, TimeUnit.SECONDS).isPresent());
+ Assertions.assertEquals(0, cache.size());
+ } finally {
+ executorService.shutdownNow();
+ cache.close();
+ }
+ }
+
+ @Test
+ void testCaffeineGetRejectsNullLoadResult() {
+ CaffeineGravitinoCache<String, Long> cache = new
CaffeineGravitinoCache<>(60_000L, 1000L);
+ try {
+ Assertions.assertThrows(NullPointerException.class, () ->
cache.get("key", key -> null));
+ Assertions.assertFalse(cache.getIfPresent("key").isPresent());
+ } finally {
+ cache.close();
+ }
+ }
+
@Test
void testCaffeineInvalidate() {
CaffeineGravitinoCache<String, String> cache = new
CaffeineGravitinoCache<>(60_000L, 1000L);
@@ -172,6 +320,23 @@ public class TestGravitinoCache {
}
}
+ @Test
+ void testNoOpsGetAlwaysLoadsAndDoesNotCache() {
+ NoOpsGravitinoCache<String, Long> cache = new NoOpsGravitinoCache<>();
+ try {
+ AtomicLong loadCount = new AtomicLong();
+
+ Assertions.assertEquals(1L, cache.get("key", key ->
loadCount.incrementAndGet()));
+ Assertions.assertEquals(2L, cache.get("key", key ->
loadCount.incrementAndGet()));
+
+ Assertions.assertEquals(2L, loadCount.get());
+ Assertions.assertFalse(cache.getIfPresent("key").isPresent());
+ Assertions.assertEquals(0, cache.size());
+ } finally {
+ cache.close();
+ }
+ }
+
@Test
void testCaffeineWithNonStringKeys() {
CaffeineGravitinoCache<Long, String> cache = new
CaffeineGravitinoCache<>(60_000L, 1000L);
diff --git a/docs/security/access-control.md b/docs/security/access-control.md
index 76858304c7..4bbb274e1e 100644
--- a/docs/security/access-control.md
+++ b/docs/security/access-control.md
@@ -451,6 +451,8 @@ To enable access control in Gravitino, configure the
following settings in your
| `gravitino.authorization.jcasbin.cacheExpirationSecs` | The expiration
time in seconds for authorization cache entries | `3600` | No
| 1.1.1 |
| `gravitino.authorization.jcasbin.roleCacheSize` | The maximum size
of the role cache for authorization | `10000` | No
| 1.1.1 |
| `gravitino.authorization.jcasbin.ownerCacheSize` | The maximum size
of the owner cache for authorization | `100000` | No
| 1.1.1 |
+| `gravitino.authorization.jcasbin.metadataIdCacheSize` | The maximum size
of the metadata ID cache for authorization | `100000` | No
| 1.3.0 |
+| `gravitino.authorization.jcasbin.changePollIntervalSecs` | The interval in
seconds for polling entity and owner changes | `3` | No
| 1.3.0 |
### Authorization Cache
@@ -462,6 +464,10 @@ Gravitino uses Caffeine caches to improve authorization
performance by caching r
- **`ownerCacheSize`**: Controls the maximum number of owner relationship
entries that can be cached. This cache maps metadata object IDs to their owner
IDs.
+- **`metadataIdCacheSize`**: Controls the maximum number of metadata
name-to-ID mapping entries that can be cached. This cache maps metadata object
names to internal metadata IDs used by JCasbin authorization checks.
+
+- **`changePollIntervalSecs`**: Controls how often a Gravitino server polls
persisted entity and owner changes to invalidate local JCasbin authorization
caches in multi-node deployments.
+
:::info
When role privileges or ownership are changed through the Gravitino API, the
corresponding cache entries are automatically invalidated to ensure
authorization decisions reflect the latest state.
:::
@@ -488,6 +494,8 @@ gravitino.authorization.serviceAdmins = admin1,admin2
gravitino.authorization.jcasbin.cacheExpirationSecs = 3600
gravitino.authorization.jcasbin.roleCacheSize = 10000
gravitino.authorization.jcasbin.ownerCacheSize = 100000
+gravitino.authorization.jcasbin.metadataIdCacheSize = 100000
+gravitino.authorization.jcasbin.changePollIntervalSecs = 3
```
## Migration Guide
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizationLookups.java
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizationLookups.java
new file mode 100644
index 0000000000..f9171bd72e
--- /dev/null
+++
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizationLookups.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.server.authorization.MetadataIdConverter;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+
+/**
+ * Two-tier metadata-id and owner resolution for {@link JcasbinAuthorizer}.
+ *
+ * <p>Each lookup is deduplicated within a single request via {@link
AuthorizationRequestContext},
+ * falls back to a shared {@link GravitinoCache} on a request miss, and
finally issues a single DB
+ * query on a cache miss. A successful DB fetch populates both tiers so
subsequent calls — in this
+ * request and later ones — hit the cache. The two underlying caches are
invalidated externally by
+ * {@link JcasbinChangePoller} (HA peers) and by the {@link
+ *
org.apache.gravitino.authorization.GravitinoAuthorizer#handleMetadataOwnerChange}
/ {@link
+ *
org.apache.gravitino.authorization.GravitinoAuthorizer#handleEntityNameIdMappingChange}
hooks
+ * (local mutations).
+ */
+public class JcasbinAuthorizationLookups {
+
+ /** Unit Separator for internal path-based cache keys. */
+ static final String KEY_SEP = "\u001F";
+
+ private final GravitinoCache<String, Long> metadataIdCache;
+ private final GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache;
+
+ /**
+ * Creates a new lookups facade around the supplied caches. The caches are
owned by the caller and
+ * remain accessible for invalidation by other components (poller, change
hooks).
+ *
+ * @param metadataIdCache path-based {@code
metalake::catalog::schema::object::TYPE} → entity id
+ * @param ownerRelCache {@code metadataObjectId} → {@link Optional} of
{@link OwnerInfo}
+ */
+ public JcasbinAuthorizationLookups(
+ GravitinoCache<String, Long> metadataIdCache,
+ GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache) {
+ this.metadataIdCache = metadataIdCache;
+ this.ownerRelCache = ownerRelCache;
+ }
+
+ /**
+ * Two-tier name→id lookup: the per-request map in {@code requestContext}
dedups calls within the
+ * same HTTP request; on a miss, the long-lived {@code metadataIdCache} is
consulted, and finally
+ * we fall back to a DB query via {@link MetadataIdConverter#getID}.
+ */
+ public Long resolveMetadataId(
+ MetadataObject metadataObject, String metalake,
AuthorizationRequestContext requestContext) {
+ String cacheKey = buildCacheKey(metalake, metadataObject);
+ return requestContext.computeMetadataIdIfAbsent(
+ cacheKey,
+ k ->
+ metadataIdCache.get(k, ignored ->
MetadataIdConverter.getID(metadataObject, metalake)));
+ }
+
+ /**
+ * Two-tier owner lookup: request-level dedup first, then the shared {@code
ownerRelCache}, and
+ * finally a single {@code owner_meta} query. A successful DB fetch
populates both tiers so
+ * subsequent {@code isOwner} calls — in this request and later ones — hit
the cache.
+ */
+ public Optional<OwnerInfo> resolveOwnerId(
+ Long metadataId,
+ MetadataObject.Type metadataType,
+ AuthorizationRequestContext requestContext) {
+ return requestContext.computeOwnerIfAbsent(
+ metadataId,
+ id ->
+ ownerRelCache.get(
+ id,
+ ignored -> {
+ OwnerInfo ownerInfo =
+ SessionUtils.getWithoutCommit(
+ OwnerMetaMapper.class,
+ m -> m.selectOwnerByMetadataObjectIdAndType(id,
metadataType.name()));
+ return ownerInfo == null ? Optional.empty() :
Optional.of(ownerInfo);
+ }));
+ }
+
+ /** Underlying metadata-id cache; exposed for invalidation by the change
hooks and the poller. */
+ public GravitinoCache<String, Long> metadataIdCache() {
+ return metadataIdCache;
+ }
+
+ /** Underlying owner cache; exposed for invalidation by the change hooks and
the poller. */
+ public GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache() {
+ return ownerRelCache;
+ }
+
+ /**
+ * Builds a path-based cache key for the metadataIdCache. Container objects
end with the internal
+ * separator so a prefix invalidation can remove the container and all
entries under the same name
+ * path.
+ *
+ * <p>Examples: {@code metalake<sep>}, {@code metalake<sep>catalog<sep>},
{@code
+ * metalake<sep>catalog<sep>schema<sep>}, {@code
+ * metalake<sep>catalog<sep>schema<sep>table<sep>TABLE}.
+ */
+ @VisibleForTesting
+ public static String buildCacheKey(String metalake, MetadataObject
metadataObject) {
+ if (metadataObject.type() == MetadataObject.Type.METALAKE) {
+ return metalake + KEY_SEP;
+ }
+ StringBuilder sb = new StringBuilder(metalake);
+ sb.append(KEY_SEP);
+ // fullName uses '.' as separator, e.g. "catalog1.schema1.table1"
+ String[] parts = metadataObject.fullName().split("\\.");
+ sb.append(String.join(KEY_SEP, parts));
+ if (isContainerType(metadataObject.type())) {
+ // Trailing separator enables prefix-based invalidation.
+ sb.append(KEY_SEP);
+ } else {
+ // Leaf nodes get the type suffix to avoid collisions
+ sb.append(KEY_SEP);
+ sb.append(metadataObject.type().name());
+ }
+ return sb.toString();
+ }
+
+ /** Returns true for entity types that can contain children (metalake,
catalog, schema). */
+ @VisibleForTesting
+ public static boolean isContainerType(MetadataObject.Type type) {
+ return type == MetadataObject.Type.METALAKE
+ || type == MetadataObject.Type.CATALOG
+ || type == MetadataObject.Type.SCHEMA;
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 3778f8b19f..2929bf7415 100644
---
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -56,12 +56,14 @@ import
org.apache.gravitino.authorization.AuthorizationUtils;
import org.apache.gravitino.authorization.GravitinoAuthorizer;
import org.apache.gravitino.authorization.Privilege;
import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.cache.CaffeineGravitinoCache;
+import org.apache.gravitino.cache.GravitinoCache;
import org.apache.gravitino.exceptions.NoSuchUserException;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.RoleEntity;
import org.apache.gravitino.meta.UserEntity;
import org.apache.gravitino.server.authorization.MetadataIdConverter;
-import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.PrincipalUtils;
import org.casbin.jcasbin.main.Enforcer;
@@ -93,7 +95,17 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
*/
private Cache<Long, Boolean> loadedRoles;
- private Cache<Long, Optional<OwnerInfo>> ownerRel;
+ /** Path-based {@code metalake::catalog::schema::object::TYPE} → entity id.
*/
+ private GravitinoCache<String, Long> metadataIdCache;
+
+ /** {@code metadataObjectId} → {@link Optional} of {@link OwnerInfo}. */
+ private GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache;
+
+ /** Two-tier lookup facade (per-request dedup + shared cache + DB fallback).
*/
+ private JcasbinAuthorizationLookups lookups;
+
+ /** Background HA invalidator for {@link #metadataIdCache} and {@link
#ownerRelCache}. */
+ private JcasbinChangePoller changePoller;
private Executor executor = null;
@@ -107,6 +119,16 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
GravitinoEnv.getInstance().config().get(Configs.GRAVITINO_AUTHORIZATION_ROLE_CACHE_SIZE);
long ownerCacheSize =
GravitinoEnv.getInstance().config().get(Configs.GRAVITINO_AUTHORIZATION_OWNER_CACHE_SIZE);
+ long metadataIdCacheSize =
+ GravitinoEnv.getInstance()
+ .config()
+ .get(Configs.GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE);
+ long pollIntervalSecs =
+ GravitinoEnv.getInstance()
+ .config()
+ .get(Configs.GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS);
+
+ long ttlMs = TimeUnit.SECONDS.toMillis(cacheExpirationSecs);
// Initialize enforcers before the caches that reference them in removal
listeners
allowEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new
GravitinoAdapter());
@@ -127,11 +149,14 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
}
})
.build();
- ownerRel =
- Caffeine.newBuilder()
- .expireAfterAccess(cacheExpirationSecs, TimeUnit.SECONDS)
- .maximumSize(ownerCacheSize)
- .build();
+ // The change poller is the primary HA invalidation path. These
write-based TTLs bound the
+ // stale window if a poll cycle misses a change; access-based TTLs could
keep hot stale entries
+ // alive indefinitely.
+ metadataIdCache = new CaffeineGravitinoCache<>(ttlMs, metadataIdCacheSize);
+ ownerRelCache = new CaffeineGravitinoCache<>(ttlMs, ownerCacheSize);
+ lookups = new JcasbinAuthorizationLookups(metadataIdCache, ownerRelCache);
+ changePoller = new JcasbinChangePoller(metadataIdCache, ownerRelCache,
pollIntervalSecs);
+ changePoller.start();
executor =
Executors.newFixedThreadPool(
GravitinoEnv.getInstance()
@@ -226,9 +251,10 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
AuthorizationRequestContext requestContext) {
boolean result;
try {
- Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
- loadOwnerPolicy(metalake, metadataObject, metadataId);
- result = checkOwnership(principal, metalake, metadataId);
+ Long metadataId = lookups.resolveMetadataId(metadataObject, metalake,
requestContext);
+ Optional<OwnerInfo> owner =
+ lookups.resolveOwnerId(metadataId, metadataObject.type(),
requestContext);
+ result = ownerMatchesUserOrGroups(owner, principal, metalake);
} catch (Exception e) {
LOG.debug("Can not get entity id", e);
result = false;
@@ -421,18 +447,55 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
public void handleMetadataOwnerChange(
String metalake, Long oldOwnerId, NameIdentifier nameIdentifier,
Entity.EntityType type) {
MetadataObject metadataObject =
NameIdentifierUtil.toMetadataObject(nameIdentifier, type);
- Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
- ownerRel.invalidate(metadataId);
+ // Owner mutations may happen after drop/recreate with the same name.
Invalidate the
+ // name->id mapping as well to prevent using a stale metadataId from
metadataIdCache.
+
metadataIdCache.invalidate(JcasbinAuthorizationLookups.buildCacheKey(metalake,
metadataObject));
+ try {
+ Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+ ownerRelCache.invalidate(metadataId);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to resolve metadata id for owner cache invalidation:
{}", metadataObject, e);
+ }
+ }
+
+ @Override
+ public void handleEntityNameIdMappingChange(
+ String metalake, NameIdentifier nameIdentifier, Entity.EntityType type) {
+ MetadataObject metadataObject =
NameIdentifierUtil.toMetadataObject(nameIdentifier, type);
+ String cacheKey = JcasbinAuthorizationLookups.buildCacheKey(metalake,
metadataObject);
+ if (JcasbinAuthorizationLookups.isContainerType(metadataObject.type())) {
+ // Prefix invalidation: metalake::catalog:: removes catalog + all
children.
+ metadataIdCache.invalidateByPrefix(cacheKey);
+ } else {
+ metadataIdCache.invalidate(cacheKey);
+ }
}
@Override
public void close() throws IOException {
+ if (changePoller != null) {
+ changePoller.close();
+ }
if (executor != null) {
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
threadPoolExecutor.shutdown();
+ try {
+ if (!threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ threadPoolExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ threadPoolExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
}
}
+ if (metadataIdCache != null) {
+ metadataIdCache.close();
+ }
+ if (ownerRelCache != null) {
+ ownerRelCache.close();
+ }
}
private class InternalAuthorizer {
@@ -464,13 +527,14 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
try {
UserEntity userEntity = getUserEntity(username, metalake);
userId = userEntity.id();
- metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+ metadataId = lookups.resolveMetadataId(metadataObject, metalake,
requestContext);
} catch (Exception e) {
LOG.debug("Can not get entity id", e);
return false;
}
loadRolePrivilege(metalake, username, userId, requestContext);
- return authorizeByJcasbin(userId, metalake, metadataObject, metadataId,
privilege);
+ return authorizeByJcasbin(
+ userId, metalake, metadataObject, metadataId, privilege,
requestContext);
}
private boolean authorizeByJcasbin(
@@ -478,9 +542,12 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
String metalake,
MetadataObject metadataObject,
Long metadataId,
- String privilege) {
+ String privilege,
+ AuthorizationRequestContext requestContext) {
if (AuthConstants.OWNER.equals(privilege)) {
- return checkOwnership(PrincipalUtils.getCurrentPrincipal(), metalake,
metadataId);
+ Optional<OwnerInfo> owner =
+ lookups.resolveOwnerId(metadataId, metadataObject.type(),
requestContext);
+ return ownerMatchesUserOrGroups(owner,
PrincipalUtils.getCurrentPrincipal(), metalake);
}
return enforcer.enforce(
String.valueOf(userId),
@@ -627,43 +694,6 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
loadRoleFutures.add(loadRoleFuture);
}
- private void loadOwnerPolicy(String metalake, MetadataObject metadataObject,
Long metadataId) {
- if (ownerRel.getIfPresent(metadataId) != null) {
- LOG.debug("Metadata {} OWNER has been loaded.", metadataId);
- return;
- }
- try {
- NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
- EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
- List<? extends Entity> owners =
- entityStore
- .relationOperations()
- .listEntitiesByRelation(
- SupportsRelationOperations.Type.OWNER_REL,
- entityIdent,
- Entity.EntityType.valueOf(metadataObject.type().name()));
- if (owners.isEmpty()) {
- ownerRel.put(metadataId, Optional.empty());
- } else {
- for (Entity ownerEntity : owners) {
- if (ownerEntity instanceof UserEntity) {
- UserEntity user = (UserEntity) ownerEntity;
- ownerRel.put(
- metadataId,
- Optional.of(new OwnerInfo(user.id(), Entity.EntityType.USER,
user.name())));
- } else if (ownerEntity instanceof GroupEntity) {
- GroupEntity group = (GroupEntity) ownerEntity;
- ownerRel.put(
- metadataId,
- Optional.of(new OwnerInfo(group.id(), Entity.EntityType.GROUP,
group.name())));
- }
- }
- }
- } catch (IOException e) {
- LOG.warn("Can not load metadata owner", e);
- }
- }
-
private void loadPolicyByRoleEntity(RoleEntity roleEntity) {
String metalake =
NameIdentifierUtil.getMetalake(roleEntity.nameIdentifier());
List<SecurableObject> securableObjects = roleEntity.securableObjects();
@@ -702,63 +732,49 @@ public class JcasbinAuthorizer implements
GravitinoAuthorizer {
}
/**
- * Checks whether the given principal is the owner of the metadata object
identified by
- * metadataId. Supports both user and group ownership.
+ * Returns true when the resolved owner matches the current user (by id) or
one of the user's
+ * groups. We compare by entity id rather than name so the cached snapshot
survives a
+ * delete-then-recreate-with-same-name scenario without spuriously granting
ownership to the new
+ * entity.
*/
- private boolean checkOwnership(Principal principal, String metalake, Long
metadataId) {
- Optional<OwnerInfo> ownerOpt = ownerRel.getIfPresent(metadataId);
- if (ownerOpt == null || !ownerOpt.isPresent()) {
+ private boolean ownerMatchesUserOrGroups(
+ Optional<OwnerInfo> owner, Principal principal, String metalake) {
+ if (!owner.isPresent()) {
return false;
}
- OwnerInfo owner = ownerOpt.get();
- // We compare by entity ID rather than name to guard against stale cache
entries.
- // If a user/group is deleted and recreated with the same name, the cached
OwnerInfo
- // still holds the old ID. A name-only comparison would incorrectly grant
ownership
- // to the new entity. The extra IO to fetch the current entity ensures
correctness.
- if (owner.type == Entity.EntityType.USER) {
+ OwnerInfo ownerInfo = owner.get();
+ if
(Entity.EntityType.USER.name().equalsIgnoreCase(ownerInfo.getOwnerType())) {
try {
UserEntity userEntity = getUserEntity(principal.getName(), metalake);
- return Objects.equals(userEntity.id(), owner.id);
+ return Objects.equals(userEntity.id(), ownerInfo.getOwnerId());
} catch (Exception e) {
LOG.debug("Can not get user entity for ownership check", e);
return false;
}
- } else if (owner.type == Entity.EntityType.GROUP) {
- if (principal instanceof UserPrincipal) {
- List<UserGroup> groups = ((UserPrincipal) principal).getGroups();
- if (groups.isEmpty()) {
- return false;
- }
- try {
- List<NameIdentifier> groupIdents =
- groups.stream()
- .map(g -> NameIdentifierUtil.ofGroup(metalake,
g.getGroupname()))
- .collect(Collectors.toList());
- List<GroupEntity> groupEntities =
- GravitinoEnv.getInstance()
- .entityStore()
- .batchGet(groupIdents, Entity.EntityType.GROUP,
GroupEntity.class);
- return groupEntities.stream().anyMatch(ge -> Objects.equals(ge.id(),
owner.id));
- } catch (Exception e) {
- LOG.debug("Can not get group entities for ownership check", e);
- return false;
- }
- }
+ }
+ if
(!Entity.EntityType.GROUP.name().equalsIgnoreCase(ownerInfo.getOwnerType())) {
return false;
}
- return false;
- }
-
- /** Holds the owner identity for a metadata object in the owner cache. */
- static class OwnerInfo {
- final Long id;
- final Entity.EntityType type;
- final String name;
-
- OwnerInfo(Long id, Entity.EntityType type, String name) {
- this.id = id;
- this.type = type;
- this.name = name;
+ if (!(principal instanceof UserPrincipal)) {
+ return false;
+ }
+ List<UserGroup> groups = ((UserPrincipal) principal).getGroups();
+ if (groups.isEmpty()) {
+ return false;
+ }
+ try {
+ List<NameIdentifier> groupIdents =
+ groups.stream()
+ .map(g -> NameIdentifierUtil.ofGroup(metalake, g.getGroupname()))
+ .collect(Collectors.toList());
+ List<GroupEntity> groupEntities =
+ GravitinoEnv.getInstance()
+ .entityStore()
+ .batchGet(groupIdents, Entity.EntityType.GROUP,
GroupEntity.class);
+ return groupEntities.stream().anyMatch(ge -> Objects.equals(ge.id(),
ownerInfo.getOwnerId()));
+ } catch (Exception e) {
+ LOG.debug("Can not get group entities for ownership check", e);
+ return false;
}
}
}
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinChangePoller.java
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinChangePoller.java
new file mode 100644
index 0000000000..611cc9616b
--- /dev/null
+++
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinChangePoller.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.po.auth.ChangedOwnerInfo;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.apache.gravitino.storage.relational.po.cache.EntityChangeRecord;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Eventual-consistency invalidator for {@link JcasbinAuthorizer}'s {@code
metadataIdCache} and
+ * {@code ownerRelCache}.
+ *
+ * <p>One scheduled thread drains {@code entity_change_log} and {@code
owner_meta} change rows since
+ * a high-water-mark cursor and invalidates the affected keys. Other Gravitino
nodes therefore
+ * observe ALTER/DROP and owner changes within one poll interval.
+ *
+ * <p>Both polls run on every tick — a failure in one does not stop the other.
+ */
+public class JcasbinChangePoller implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JcasbinChangePoller.class);
+
+ /** Max entity-change rows to fetch per poller cycle. */
+ private static final int ENTITY_CHANGE_POLLER_MAX_ROWS = 500;
+
+ private final GravitinoCache<String, Long> metadataIdCache;
+ private final GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache;
+ private final long pollIntervalSecs;
+
+ private ScheduledExecutorService scheduler;
+ private volatile long ownerPollHighWaterId = 0;
+ private volatile long entityPollHighWaterId = 0;
+
+ /**
+ * @param metadataIdCache the metadata-id cache to invalidate on entity
changes
+ * @param ownerRelCache the owner cache to invalidate on owner changes
+ * @param pollIntervalSecs interval between successive polling cycles
+ */
+ public JcasbinChangePoller(
+ GravitinoCache<String, Long> metadataIdCache,
+ GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache,
+ long pollIntervalSecs) {
+ Preconditions.checkArgument(pollIntervalSecs > 0, "pollIntervalSecs must
be positive");
+ this.metadataIdCache = metadataIdCache;
+ this.ownerRelCache = ownerRelCache;
+ this.pollIntervalSecs = pollIntervalSecs;
+ }
+
+ /**
+ * Initializes the high-water cursors to the current DB tail (so startup
does not scan historical
+ * changes) and schedules periodic polling.
+ *
+ * <p>Known trade-off: an id-based high-water mark can miss rows whose id is
allocated before the
+ * cursor snapshot but whose commit lands after it. Concretely, if writer A
holds {@code id=N-1}
+ * uncommitted while writer B commits {@code id=N}, {@code
selectMaxChangeId()} returns N and the
+ * next poll queries {@code id > N} — A's row is never consumed. In that
case the affected cache
+ * entry stays stale until either (a) a request-side path catches it on the
next request, or (b)
+ * TTL eviction. Acceptable for the eventual-consistency caches targeted
here; revisit if we ever
+ * route strong-consistency data through this poller.
+ */
+ public void start() {
+ ownerPollHighWaterId =
+ getOrDefault(
+ SessionUtils.getWithoutCommit(
+ OwnerMetaMapper.class, OwnerMetaMapper::selectMaxChangeId));
+ entityPollHighWaterId =
+ getOrDefault(
+ SessionUtils.getWithoutCommit(
+ EntityChangeLogMapper.class,
EntityChangeLogMapper::selectMaxChangeId));
+
+ scheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t = new Thread(r);
+ t.setName("GravitinoAuthorizer-ChangePoller");
+ t.setDaemon(true);
+ return t;
+ });
+ scheduler.scheduleWithFixedDelay(
+ this::pollChanges, pollIntervalSecs, pollIntervalSecs,
TimeUnit.SECONDS);
+ }
+
+ @VisibleForTesting
+ void pollChanges() {
+ try {
+ LOG.debug("Polling for owner changes after id {}", ownerPollHighWaterId);
+ pollOwnerChanges();
+ } catch (Exception e) {
+ if (handleInterruptIfAny(e, "Owner change poll")) {
+ return;
+ }
+ LOG.warn("Owner change poll failed", e);
+ }
+
+ try {
+ LOG.debug("Polling for entity changes after id {}",
entityPollHighWaterId);
+ pollEntityChanges();
+ } catch (Exception e) {
+ if (handleInterruptIfAny(e, "Entity change poll")) {
+ return;
+ }
+ LOG.warn("Entity change poll failed", e);
+ }
+ }
+
+ /**
+ * Restores the interrupt flag and returns true if {@code e} carries (or the
current thread has
+ * accumulated) an interruption, so the caller can bail out of this poll
cycle quickly during
+ * {@link #close()}-driven {@code shutdownNow()}.
+ */
+ private static boolean handleInterruptIfAny(Throwable e, String context) {
+ Throwable t = e;
+ while (t != null) {
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ LOG.debug("{} interrupted, stopping poll cycle", context);
+ return true;
+ }
+ t = t.getCause();
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.debug("{} ran while thread was interrupted, stopping poll cycle",
context);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Drains owner-change rows past {@link #ownerPollHighWaterId} and
invalidates the affected {@code
+ * ownerRelCache} entries. Each row carries {@code metadataObjectId}, so
invalidation is a direct
+ * key removal — no name resolution needed.
+ *
+ * <p>The {@code synchronized} modifier is defensive. In production this
method is only invoked
+ * from the single-threaded scheduler started in {@link #start()}, and {@link
+ * java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay}
guarantees that
+ * consecutive runs do not overlap. The cursor field {@link
#ownerPollHighWaterId} is also {@code
+ * volatile}, and cache invalidations are now atomic at the cache layer via
{@link
+ * org.apache.gravitino.cache.GravitinoCache#runInvalidationBatch}. The
keyword is kept so that
+ * future callers — additional schedulers, ad-hoc invocations from tests or
admin tooling — do not
+ * silently introduce concurrent {@code "select changes → invalidate →
advance cursor"} sequences.
+ * Cost is negligible under no contention thanks to biased / elided locking.
+ */
+ private synchronized void pollOwnerChanges() {
+ List<ChangedOwnerInfo> changes =
+ SessionUtils.getWithoutCommit(
+ OwnerMetaMapper.class, m ->
m.selectChangedOwners(ownerPollHighWaterId));
+ if (changes.isEmpty()) {
+ return;
+ }
+
+ long[] maxSeenId = {ownerPollHighWaterId};
+ // Hold the cache's exclusive invalidation lock for the whole batch so
readers never observe
+ // a half-applied state where some of this batch's entries have been
evicted and others are
+ // still hot.
+ ownerRelCache.runInvalidationBatch(
+ () -> {
+ for (ChangedOwnerInfo change : changes) {
+ ownerRelCache.invalidate(change.getMetadataObjectId());
+ if (change.getId() > maxSeenId[0]) {
+ maxSeenId[0] = change.getId();
+ }
+ }
+ });
+ ownerPollHighWaterId = maxSeenId[0];
+ }
+
+ /**
+ * Drains entity-change rows past {@link #entityPollHighWaterId} and
invalidates the affected
+ * {@code metadataIdCache} keys.
+ *
+ * <p><b>Contract with the writer side:</b> {@code
entity_change_log.full_name} must be the
+ * <i>pre-mutation</i> name (the name that consumers currently have cached).
The writers in {@code
+ * SchemaMetaService} / {@code TableMetaService} / etc. emit {@code
oldFullName} on rename and the
+ * current name on drop, so the cacheKey we build here resolves to the entry
a peer node would
+ * have populated under that name. If a future change starts emitting the
new post-rename name,
+ * this invalidation will silently miss and stale entries will only clear
via LRU eviction.
+ *
+ * <p>The {@code synchronized} modifier is defensive — see the note on
{@link #pollOwnerChanges()}
+ * for the rationale. The single-threaded scheduler already prevents
overlapping runs in
+ * production, and the per-batch invalidation atomicity is provided by the
cache itself.
+ */
+ private synchronized void pollEntityChanges() {
+ List<EntityChangeRecord> changes =
+ SessionUtils.getWithoutCommit(
+ EntityChangeLogMapper.class,
+ m -> m.selectEntityChanges(entityPollHighWaterId,
ENTITY_CHANGE_POLLER_MAX_ROWS));
+
+ long maxSeenId = entityPollHighWaterId;
+ Set<String> containerPrefixes = new LinkedHashSet<>();
+ Set<String> leafKeys = new LinkedHashSet<>();
+ for (EntityChangeRecord change : changes) {
+ String metalake = change.getMetalakeName();
+ String entityType = change.getEntityType();
+ String fullName = change.getFullName();
+
+ MetadataObject.Type mdType;
+ try {
+ mdType =
MetadataObject.Type.valueOf(entityType.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Unknown entity type in change log: {}", entityType);
+ if (change.getId() > maxSeenId) {
+ maxSeenId = change.getId();
+ }
+ continue;
+ }
+
+ MetadataObject mdObj = metadataObjectFromChangeLog(metalake, fullName,
mdType);
+ String cacheKey = JcasbinAuthorizationLookups.buildCacheKey(metalake,
mdObj);
+
+ if (JcasbinAuthorizationLookups.isContainerType(mdType)) {
+ addCoalescedPrefix(containerPrefixes, cacheKey);
+ } else {
+ leafKeys.add(cacheKey);
+ }
+
+ if (change.getId() > maxSeenId) {
+ maxSeenId = change.getId();
+ }
+ }
+ invalidateCoalescedKeys(containerPrefixes, leafKeys);
+ entityPollHighWaterId = maxSeenId;
+ }
+
+ @Override
+ public void close() {
+ if (scheduler != null) {
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static MetadataObject metadataObjectFromChangeLog(
+ String metalake, String fullName, MetadataObject.Type type) {
+ List<String> names = new ArrayList<>(Arrays.asList(fullName.split("\\.")));
+ if (type != MetadataObject.Type.METALAKE
+ && !names.isEmpty()
+ && Objects.equals(names.get(0), metalake)) {
+ names.remove(0);
+ }
+ return MetadataObjects.of(names, type);
+ }
+
+ private static long getOrDefault(Long value) {
+ return value == null ? 0L : value;
+ }
+
+ private static void addCoalescedPrefix(Set<String> prefixes, String
candidate) {
+ for (String prefix : prefixes) {
+ if (candidate.startsWith(prefix)) {
+ return;
+ }
+ }
+ prefixes.removeIf(prefix -> prefix.startsWith(candidate));
+ prefixes.add(candidate);
+ }
+
+ private void invalidateCoalescedKeys(Set<String> prefixes, Set<String>
leafKeys) {
+ if (prefixes.isEmpty() && leafKeys.isEmpty()) {
+ return;
+ }
+ // Hold the cache's exclusive invalidation lock for the whole batch so
readers never observe
+ // a half-applied state where some prefix/leaf keys have been evicted and
others have not.
+ metadataIdCache.runInvalidationBatch(
+ () -> {
+ for (String prefix : prefixes) {
+ metadataIdCache.invalidateByPrefix(prefix);
+ }
+ for (String leafKey : leafKeys) {
+ if (prefixes.stream().noneMatch(leafKey::startsWith)) {
+ metadataIdCache.invalidate(leafKey);
+ }
+ }
+ });
+ }
+}
diff --git
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizationLookups.java
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizationLookups.java
new file mode 100644
index 0000000000..cfab7f8010
--- /dev/null
+++
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizationLookups.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.authorization.AuthorizationRequestContext;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link JcasbinAuthorizationLookups} static helpers. */
+public class TestJcasbinAuthorizationLookups {
+
+ // ---------- buildCacheKey ----------
+
+ @Test
+ void testBuildCacheKeyMetalake() {
+ MetadataObject obj =
+ MetadataObjects.of(Collections.singletonList("ml1"),
MetadataObject.Type.METALAKE);
+ Assertions.assertEquals(key("ml1", ""),
JcasbinAuthorizationLookups.buildCacheKey("ml1", obj));
+ }
+
+ @Test
+ void testBuildCacheKeyCatalog() {
+ MetadataObject obj =
+ MetadataObjects.of(Collections.singletonList("cat1"),
MetadataObject.Type.CATALOG);
+ Assertions.assertEquals(
+ key("ml1", "cat1", ""),
JcasbinAuthorizationLookups.buildCacheKey("ml1", obj));
+ }
+
+ @Test
+ void testBuildCacheKeySchema() {
+ MetadataObject obj =
+ MetadataObjects.of(Arrays.asList("cat1", "sch1"),
MetadataObject.Type.SCHEMA);
+ Assertions.assertEquals(
+ key("ml1", "cat1", "sch1", ""),
JcasbinAuthorizationLookups.buildCacheKey("ml1", obj));
+ }
+
+ @Test
+ void testBuildCacheKeyLeafTypesGetTypeSuffix() {
+ MetadataObject table =
+ MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"),
MetadataObject.Type.TABLE);
+ Assertions.assertEquals(
+ key("ml1", "cat1", "sch1", "tbl1", "TABLE"),
+ JcasbinAuthorizationLookups.buildCacheKey("ml1", table));
+
+ MetadataObject view =
+ MetadataObjects.of(Arrays.asList("cat1", "sch1", "v1"),
MetadataObject.Type.VIEW);
+ Assertions.assertEquals(
+ key("ml1", "cat1", "sch1", "v1", "VIEW"),
+ JcasbinAuthorizationLookups.buildCacheKey("ml1", view));
+
+ MetadataObject fileset =
+ MetadataObjects.of(Arrays.asList("cat1", "sch1", "fs1"),
MetadataObject.Type.FILESET);
+ Assertions.assertEquals(
+ key("ml1", "cat1", "sch1", "fs1", "FILESET"),
+ JcasbinAuthorizationLookups.buildCacheKey("ml1", fileset));
+ }
+
+ // ---------- isContainerType ----------
+
+ @Test
+ void testIsContainerTypeContainerTypes() {
+ Assertions.assertTrue(
+
JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.METALAKE));
+
Assertions.assertTrue(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.CATALOG));
+
Assertions.assertTrue(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.SCHEMA));
+ }
+
+ @Test
+ void testIsContainerTypeLeafTypes() {
+
Assertions.assertFalse(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.TABLE));
+
Assertions.assertFalse(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.VIEW));
+ Assertions.assertFalse(
+
JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.FILESET));
+
Assertions.assertFalse(JcasbinAuthorizationLookups.isContainerType(MetadataObject.Type.TOPIC));
+ }
+
+ // ---------- Prefix invalidation ----------
+
+ @Test
+ void testPrefixInvalidationCoversContainerPath() {
+ // Dropping a catalog should use a prefix that covers all schemas and
tables below it.
+ MetadataObject catalog =
+ MetadataObjects.of(Collections.singletonList("cat1"),
MetadataObject.Type.CATALOG);
+ String catalogKey = JcasbinAuthorizationLookups.buildCacheKey("ml1",
catalog);
+
+ MetadataObject schema =
+ MetadataObjects.of(Arrays.asList("cat1", "sch1"),
MetadataObject.Type.SCHEMA);
+ String schemaKey = JcasbinAuthorizationLookups.buildCacheKey("ml1",
schema);
+
+ MetadataObject table =
+ MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"),
MetadataObject.Type.TABLE);
+ String tableKey = JcasbinAuthorizationLookups.buildCacheKey("ml1", table);
+
+ Assertions.assertTrue(schemaKey.startsWith(catalogKey));
+ Assertions.assertTrue(tableKey.startsWith(catalogKey));
+ Assertions.assertTrue(tableKey.startsWith(schemaKey));
+ }
+
+ @Test
+ void testResolveMetadataIdUsesAtomicSharedCacheAndRequestDedup() {
+ MetadataObject table =
+ MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"),
MetadataObject.Type.TABLE);
+ CountingCache<String, Long> metadataIdCache = new CountingCache<>(100L);
+ CountingCache<Long, Optional<OwnerInfo>> ownerRelCache = new
CountingCache<>(Optional.empty());
+ JcasbinAuthorizationLookups lookups =
+ new JcasbinAuthorizationLookups(metadataIdCache, ownerRelCache);
+ AuthorizationRequestContext requestContext = new
AuthorizationRequestContext();
+
+ Assertions.assertEquals(100L, lookups.resolveMetadataId(table, "ml1",
requestContext));
+ Assertions.assertEquals(100L, lookups.resolveMetadataId(table, "ml1",
requestContext));
+
+ Assertions.assertEquals(1, metadataIdCache.getCount);
+ Assertions.assertEquals(0, metadataIdCache.getIfPresentCount);
+ Assertions.assertEquals(0, metadataIdCache.putCount);
+ }
+
+ @Test
+ void testResolveOwnerIdUsesAtomicSharedCacheAndRequestDedup() {
+ CountingCache<String, Long> metadataIdCache = new CountingCache<>(100L);
+ CountingCache<Long, Optional<OwnerInfo>> ownerRelCache = new
CountingCache<>(Optional.empty());
+ JcasbinAuthorizationLookups lookups =
+ new JcasbinAuthorizationLookups(metadataIdCache, ownerRelCache);
+ AuthorizationRequestContext requestContext = new
AuthorizationRequestContext();
+
+ Assertions.assertFalse(
+ lookups.resolveOwnerId(100L, MetadataObject.Type.TABLE,
requestContext).isPresent());
+ Assertions.assertFalse(
+ lookups.resolveOwnerId(100L, MetadataObject.Type.TABLE,
requestContext).isPresent());
+
+ Assertions.assertEquals(1, ownerRelCache.getCount);
+ Assertions.assertEquals(0, ownerRelCache.getIfPresentCount);
+ Assertions.assertEquals(0, ownerRelCache.putCount);
+ }
+
+ private static String key(String... parts) {
+ return String.join(JcasbinAuthorizationLookups.KEY_SEP, parts);
+ }
+
+ private static class CountingCache<K, V> implements GravitinoCache<K, V> {
+ private final V value;
+ private int getCount;
+ private int getIfPresentCount;
+ private int putCount;
+
+ private CountingCache(V value) {
+ this.value = value;
+ }
+
+ @Override
+ public Optional<V> getIfPresent(K key) {
+ getIfPresentCount++;
+ return Optional.empty();
+ }
+
+ @Override
+ public V get(K key, Function<K, V> loader) {
+ getCount++;
+ return value;
+ }
+
+ @Override
+ public void put(K key, V value) {
+ putCount++;
+ }
+
+ @Override
+ public void invalidate(K key) {}
+
+ @Override
+ public void invalidateAll() {}
+
+ @Override
+ public void invalidateByPrefix(String prefix) {}
+
+ @Override
+ public long size() {
+ return 0;
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index afa48efc98..3e65d4fb4a 100644
---
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -24,8 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@@ -37,10 +38,15 @@ import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.lang.reflect.Field;
import java.security.Principal;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Entity;
@@ -56,6 +62,7 @@ import org.apache.gravitino.UserPrincipal;
import org.apache.gravitino.authorization.AuthorizationRequestContext;
import org.apache.gravitino.authorization.Privilege;
import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.cache.GravitinoCache;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.GroupEntity;
@@ -64,15 +71,19 @@ import org.apache.gravitino.meta.SchemaVersion;
import org.apache.gravitino.meta.UserEntity;
import org.apache.gravitino.server.ServerConfig;
import org.apache.gravitino.server.authorization.MetadataIdConverter;
-import
org.apache.gravitino.server.authorization.jcasbin.JcasbinAuthorizer.OwnerInfo;
+import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
import org.apache.gravitino.storage.relational.service.OwnerMetaService;
import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
import org.apache.gravitino.utils.PrincipalUtils;
import org.casbin.jcasbin.main.Enforcer;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -114,6 +125,12 @@ public class TestJcasbinAuthorizer {
private static MockedStatic<OwnerMetaService> ownerMetaServiceMockedStatic;
+ private static MockedStatic<SessionUtils> sessionUtilsMockedStatic;
+
+ private static OwnerMetaMapper ownerMetaMapper = mock(OwnerMetaMapper.class);
+
+ private static EntityChangeLogMapper entityChangeLogMapper =
mock(EntityChangeLogMapper.class);
+
private static JcasbinAuthorizer jcasbinAuthorizer;
private static ObjectMapper objectMapper = new ObjectMapper();
@@ -123,6 +140,31 @@ public class TestJcasbinAuthorizer {
OwnerMetaService ownerMetaService = mock(OwnerMetaService.class);
ownerMetaServiceMockedStatic = mockStatic(OwnerMetaService.class);
ownerMetaServiceMockedStatic.when(OwnerMetaService::getInstance).thenReturn(ownerMetaService);
+ when(ownerMetaMapper.selectMaxChangeId()).thenReturn(0L);
+
when(ownerMetaMapper.selectChangedOwners(anyLong())).thenReturn(Collections.emptyList());
+ when(entityChangeLogMapper.selectMaxChangeId()).thenReturn(0L);
+ when(entityChangeLogMapper.selectEntityChanges(anyLong(), anyInt()))
+ .thenReturn(Collections.emptyList());
+
+ // The change poller probes entity_change_log + owner_meta on startup and
owner lookups go via
+ // SessionUtils; mock SessionUtils to delegate to mapper mocks so tests
can stub owner state
+ // without opening a real MyBatis session. Poller-only mapper calls return
safe empty defaults.
+ sessionUtilsMockedStatic = mockStatic(SessionUtils.class);
+ sessionUtilsMockedStatic
+ .when(() -> SessionUtils.getWithoutCommit(any(), any()))
+ .thenAnswer(
+ invocation -> {
+ Class<?> mapperClass = invocation.getArgument(0);
+ Function<Object, Object> func = invocation.getArgument(1);
+ if (mapperClass == OwnerMetaMapper.class) {
+ return func.apply(ownerMetaMapper);
+ }
+ if (mapperClass == EntityChangeLogMapper.class) {
+ return func.apply(entityChangeLogMapper);
+ }
+ return null;
+ });
+
gravitinoEnvMockedStatic = mockStatic(GravitinoEnv.class);
gravitinoEnvMockedStatic.when(GravitinoEnv::getInstance).thenReturn(gravitinoEnv);
when(gravitinoEnv.config()).thenReturn(new ServerConfig());
@@ -161,6 +203,13 @@ public class TestJcasbinAuthorizer {
@AfterAll
public static void stop() {
+ if (jcasbinAuthorizer != null) {
+ try {
+ jcasbinAuthorizer.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to close JcasbinAuthorizer", e);
+ }
+ }
if (principalUtilsMockedStatic != null) {
principalUtilsMockedStatic.close();
}
@@ -170,6 +219,9 @@ public class TestJcasbinAuthorizer {
if (ownerMetaServiceMockedStatic != null) {
ownerMetaServiceMockedStatic.close();
}
+ if (sessionUtilsMockedStatic != null) {
+ sessionUtilsMockedStatic.close();
+ }
if (gravitinoEnvMockedStatic != null) {
gravitinoEnvMockedStatic.close();
}
@@ -258,23 +310,23 @@ public class TestJcasbinAuthorizer {
@Test
public void testAuthorizeByOwner() throws Exception {
Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
- assertFalse(doAuthorizeOwner(currentPrincipal));
NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE,
"testCatalog");
- List<UserEntity> owners = ImmutableList.of(getUserEntity());
- doReturn(owners)
- .when(supportsRelationOperations)
- .listEntitiesByRelation(
- eq(SupportsRelationOperations.Type.OWNER_REL),
- eq(catalogIdent),
- eq(Entity.EntityType.CATALOG));
+
+ // No owner set — should fail
+ when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID),
eq("CATALOG")))
+ .thenReturn(null);
+ getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
+ assertFalse(doAuthorizeOwner(currentPrincipal));
+
+ // Set owner to current user
+ when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID),
eq("CATALOG")))
+ .thenReturn(new OwnerInfo(USER_ID, "USER"));
getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
assertTrue(doAuthorizeOwner(currentPrincipal));
- doReturn(new ArrayList<>())
- .when(supportsRelationOperations)
- .listEntitiesByRelation(
- eq(SupportsRelationOperations.Type.OWNER_REL),
- eq(catalogIdent),
- eq(Entity.EntityType.CATALOG));
+
+ // Remove owner via change hook
+ when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID),
eq("CATALOG")))
+ .thenReturn(null);
jcasbinAuthorizer.handleMetadataOwnerChange(
METALAKE, USER_ID, catalogIdent, Entity.EntityType.CATALOG);
assertFalse(doAuthorizeOwner(currentPrincipal));
@@ -311,26 +363,18 @@ public class TestJcasbinAuthorizer {
.withAuditInfo(AuditInfo.EMPTY)
.build()));
- // Mock owner relation returning a GroupEntity
- List<GroupEntity> owners = ImmutableList.of(getGroupEntity());
- doReturn(owners)
- .when(supportsRelationOperations)
- .listEntitiesByRelation(
- eq(SupportsRelationOperations.Type.OWNER_REL),
- eq(catalogIdent),
- eq(Entity.EntityType.CATALOG));
+ // Mock owner_meta returning a GROUP-typed owner with GROUP_ID
+ OwnerInfo groupOwnerInfo = new OwnerInfo(GROUP_ID, "GROUP");
+ when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID),
eq("CATALOG")))
+ .thenReturn(groupOwnerInfo);
getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
// The principal belongs to the owning group, so isOwner should return true
assertTrue(doAuthorizeOwner(groupPrincipal));
// Clear owner and verify it returns false
- doReturn(new ArrayList<>())
- .when(supportsRelationOperations)
- .listEntitiesByRelation(
- eq(SupportsRelationOperations.Type.OWNER_REL),
- eq(catalogIdent),
- eq(Entity.EntityType.CATALOG));
+ when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID),
eq("CATALOG")))
+ .thenReturn(null);
jcasbinAuthorizer.handleMetadataOwnerChange(
METALAKE, GROUP_ID, catalogIdent, Entity.EntityType.CATALOG);
assertFalse(doAuthorizeOwner(groupPrincipal));
@@ -342,13 +386,8 @@ public class TestJcasbinAuthorizer {
principalUtilsMockedStatic
.when(PrincipalUtils::getCurrentPrincipal)
.thenReturn(nonMemberPrincipal);
- // Re-populate the owner cache with the group owner
- doReturn(ImmutableList.of(getGroupEntity()))
- .when(supportsRelationOperations)
- .listEntitiesByRelation(
- eq(SupportsRelationOperations.Type.OWNER_REL),
- eq(catalogIdent),
- eq(Entity.EntityType.CATALOG));
+ when(ownerMetaMapper.selectOwnerByMetadataObjectIdAndType(eq(CATALOG_ID),
eq("CATALOG")))
+ .thenReturn(groupOwnerInfo);
getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
assertFalse(doAuthorizeOwner(nonMemberPrincipal));
@@ -984,14 +1023,13 @@ public class TestJcasbinAuthorizer {
@Test
public void testOwnerCacheInvalidation() throws Exception {
- // Get the ownerRel cache via reflection
- Cache<Long, Optional<OwnerInfo>> ownerRel =
getOwnerRelCache(jcasbinAuthorizer);
+ GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache =
getOwnerRelCache(jcasbinAuthorizer);
// Manually add an owner relation to the cache
- ownerRel.put(CATALOG_ID, Optional.of(new OwnerInfo(USER_ID,
Entity.EntityType.USER, USERNAME)));
+ ownerRelCache.put(CATALOG_ID, Optional.of(new OwnerInfo(USER_ID, "USER")));
// Verify it's in the cache
- assertNotNull(ownerRel.getIfPresent(CATALOG_ID));
+ assertTrue(ownerRelCache.getIfPresent(CATALOG_ID).isPresent());
// Create a mock NameIdentifier for the metadata object
NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE,
"testCatalog");
@@ -1001,7 +1039,74 @@ public class TestJcasbinAuthorizer {
METALAKE, USER_ID, catalogIdent, Entity.EntityType.CATALOG);
// Verify it's removed from the cache
- assertNull(ownerRel.getIfPresent(CATALOG_ID));
+ assertFalse(ownerRelCache.getIfPresent(CATALOG_ID).isPresent());
+ }
+
+ @Test
+ public void testOwnerChangeBestEffortWhenMetadataIdLookupFails() throws
Exception {
+ GravitinoCache<String, Long> metadataIdCache =
getMetadataIdCache(jcasbinAuthorizer);
+ GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache =
getOwnerRelCache(jcasbinAuthorizer);
+ NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE,
"testCatalog");
+ String cacheKey =
+ JcasbinAuthorizationLookups.buildCacheKey(
+ METALAKE, NameIdentifierUtil.toMetadataObject(catalogIdent,
Entity.EntityType.CATALOG));
+
+ metadataIdCache.put(cacheKey, CATALOG_ID);
+ ownerRelCache.put(CATALOG_ID, Optional.of(new OwnerInfo(USER_ID, "USER")));
+ metadataIdConverterMockedStatic
+ .when(() -> MetadataIdConverter.getID(any(), eq(METALAKE)))
+ .thenThrow(new RuntimeException("lookup failed"));
+
+ try {
+ Assertions.assertDoesNotThrow(
+ () ->
+ jcasbinAuthorizer.handleMetadataOwnerChange(
+ METALAKE, USER_ID, catalogIdent, Entity.EntityType.CATALOG));
+
+ assertFalse(metadataIdCache.getIfPresent(cacheKey).isPresent());
+ assertTrue(ownerRelCache.getIfPresent(CATALOG_ID).isPresent());
+ } finally {
+ metadataIdConverterMockedStatic
+ .when(() -> MetadataIdConverter.getID(any(), eq(METALAKE)))
+ .thenReturn(CATALOG_ID);
+ }
+ }
+
+ @Test
+ public void testCloseAwaitsRoleLoadExecutorTermination() throws Exception {
+ RecordingThreadPoolExecutor executor = new RecordingThreadPoolExecutor();
+
+ Field executorField = JcasbinAuthorizer.class.getDeclaredField("executor");
+ Field changePollerField =
JcasbinAuthorizer.class.getDeclaredField("changePoller");
+ Field metadataIdCacheField =
JcasbinAuthorizer.class.getDeclaredField("metadataIdCache");
+ Field ownerRelCacheField =
JcasbinAuthorizer.class.getDeclaredField("ownerRelCache");
+
+ Executor originalExecutor =
+ (Executor) FieldUtils.readField(executorField, jcasbinAuthorizer,
true);
+ Object originalPoller = FieldUtils.readField(changePollerField,
jcasbinAuthorizer, true);
+ Object originalMetadataIdCache =
+ FieldUtils.readField(metadataIdCacheField, jcasbinAuthorizer, true);
+ Object originalOwnerRelCache =
+ FieldUtils.readField(ownerRelCacheField, jcasbinAuthorizer, true);
+
+ try {
+ FieldUtils.writeField(executorField, jcasbinAuthorizer, executor, true);
+ // Detach shared resources so close() only exercises the executor
shutdown branch and does
+ // not leave the shared poller / caches in a closed state for subsequent
tests.
+ FieldUtils.writeField(changePollerField, jcasbinAuthorizer, null, true);
+ FieldUtils.writeField(metadataIdCacheField, jcasbinAuthorizer, null,
true);
+ FieldUtils.writeField(ownerRelCacheField, jcasbinAuthorizer, null, true);
+
+ jcasbinAuthorizer.close();
+
+ assertTrue(executor.shutdownCalled.get());
+ assertTrue(executor.awaitTerminationCalled.get());
+ } finally {
+ FieldUtils.writeField(executorField, jcasbinAuthorizer,
originalExecutor, true);
+ FieldUtils.writeField(changePollerField, jcasbinAuthorizer,
originalPoller, true);
+ FieldUtils.writeField(metadataIdCacheField, jcasbinAuthorizer,
originalMetadataIdCache, true);
+ FieldUtils.writeField(ownerRelCacheField, jcasbinAuthorizer,
originalOwnerRelCache, true);
+ }
}
@Test
@@ -1043,10 +1148,10 @@ public class TestJcasbinAuthorizer {
public void testCacheInitialization() throws Exception {
// Verify that caches are initialized
Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
- Cache<Long, Optional<OwnerInfo>> ownerRel =
getOwnerRelCache(jcasbinAuthorizer);
+ GravitinoCache<Long, Optional<OwnerInfo>> ownerRelCache =
getOwnerRelCache(jcasbinAuthorizer);
assertNotNull(loadedRoles, "loadedRoles cache should be initialized");
- assertNotNull(ownerRel, "ownerRel cache should be initialized");
+ assertNotNull(ownerRelCache, "ownerRelCache should be initialized");
}
/** Tests {@link JcasbinAuthorizer#hasMetadataPrivilegePermission} hierarchy
walk */
@@ -1199,11 +1304,19 @@ public class TestJcasbinAuthorizer {
}
@SuppressWarnings("unchecked")
- private static Cache<Long, Optional<OwnerInfo>>
getOwnerRelCache(JcasbinAuthorizer authorizer)
+ private static GravitinoCache<Long, Optional<OwnerInfo>> getOwnerRelCache(
+ JcasbinAuthorizer authorizer) throws Exception {
+ Field field = JcasbinAuthorizer.class.getDeclaredField("ownerRelCache");
+ field.setAccessible(true);
+ return (GravitinoCache<Long, Optional<OwnerInfo>>) field.get(authorizer);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static GravitinoCache<String, Long>
getMetadataIdCache(JcasbinAuthorizer authorizer)
throws Exception {
- Field field = JcasbinAuthorizer.class.getDeclaredField("ownerRel");
+ Field field = JcasbinAuthorizer.class.getDeclaredField("metadataIdCache");
field.setAccessible(true);
- return (Cache<Long, Optional<OwnerInfo>>) field.get(authorizer);
+ return (GravitinoCache<String, Long>) field.get(authorizer);
}
private static Enforcer getAllowEnforcer(JcasbinAuthorizer authorizer)
throws Exception {
@@ -1217,4 +1330,25 @@ public class TestJcasbinAuthorizer {
field.setAccessible(true);
return (Enforcer) field.get(authorizer);
}
+
+ private static class RecordingThreadPoolExecutor extends ThreadPoolExecutor {
+ private final AtomicBoolean shutdownCalled = new AtomicBoolean();
+ private final AtomicBoolean awaitTerminationCalled = new AtomicBoolean();
+
+ private RecordingThreadPoolExecutor() {
+ super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+ }
+
+ @Override
+ public void shutdown() {
+ shutdownCalled.set(true);
+ super.shutdown();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ awaitTerminationCalled.set(true);
+ return super.awaitTermination(timeout, unit);
+ }
+ }
}
diff --git
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinChangePoller.java
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinChangePoller.java
new file mode 100644
index 0000000000..867a862603
--- /dev/null
+++
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinChangePoller.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.cache.GravitinoCache;
+import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.apache.gravitino.storage.relational.po.cache.EntityChangeRecord;
+import org.apache.gravitino.storage.relational.po.cache.OperateType;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+/** Tests for {@link JcasbinChangePoller} static helpers. */
+public class TestJcasbinChangePoller {
+
+ @Test
+ void testRejectsNonPositivePollInterval() {
+ RecordingCache<String, Long> metadataIdCache = new RecordingCache<>();
+ RecordingCache<Long, Optional<OwnerInfo>> ownerRelCache = new
RecordingCache<>();
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new JcasbinChangePoller(metadataIdCache, ownerRelCache, 0));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new JcasbinChangePoller(metadataIdCache, ownerRelCache, -1));
+ }
+
+ @Test
+ void testChangeLogFullNameStripsLeadingMetalakeForChildTypes() {
+ MetadataObject catalog =
+ JcasbinChangePoller.metadataObjectFromChangeLog(
+ "ml1", "ml1.cat1", MetadataObject.Type.CATALOG);
+ Assertions.assertEquals(
+ key("ml1", "cat1", ""),
JcasbinAuthorizationLookups.buildCacheKey("ml1", catalog));
+
+ MetadataObject schema =
+ JcasbinChangePoller.metadataObjectFromChangeLog(
+ "ml1", "ml1.cat1.sch1", MetadataObject.Type.SCHEMA);
+ Assertions.assertEquals(
+ key("ml1", "cat1", "sch1", ""),
JcasbinAuthorizationLookups.buildCacheKey("ml1", schema));
+
+ MetadataObject table =
+ JcasbinChangePoller.metadataObjectFromChangeLog(
+ "ml1", "ml1.cat1.sch1.tbl1", MetadataObject.Type.TABLE);
+ Assertions.assertEquals(
+ key("ml1", "cat1", "sch1", "tbl1", "TABLE"),
+ JcasbinAuthorizationLookups.buildCacheKey("ml1", table));
+ }
+
+ @Test
+ void testChangeLogFullNameForMetalakeKeepsItself() {
+ MetadataObject metalake =
+ JcasbinChangePoller.metadataObjectFromChangeLog("ml1", "ml1",
MetadataObject.Type.METALAKE);
+ Assertions.assertEquals(
+ key("ml1", ""), JcasbinAuthorizationLookups.buildCacheKey("ml1",
metalake));
+ }
+
+ @Test
+ void testPollEntityChangesCoalescesContainerPrefixes() {
+ RecordingCache<String, Long> metadataIdCache = new RecordingCache<>();
+ RecordingCache<Long, Optional<OwnerInfo>> ownerRelCache = new
RecordingCache<>();
+ EntityChangeLogMapper entityChangeLogMapper =
mock(EntityChangeLogMapper.class);
+ OwnerMetaMapper ownerMetaMapper = mock(OwnerMetaMapper.class);
+
+
when(ownerMetaMapper.selectChangedOwners(0L)).thenReturn(Collections.emptyList());
+ when(entityChangeLogMapper.selectEntityChanges(0L, 500))
+ .thenReturn(
+ List.of(
+ change(1L, MetadataObject.Type.CATALOG, "ml1.cat1"),
+ change(2L, MetadataObject.Type.SCHEMA, "ml1.cat1.sch1"),
+ change(3L, MetadataObject.Type.TABLE, "ml1.cat1.sch1.tbl1"),
+ change(4L, MetadataObject.Type.TABLE, "ml1.cat2.sch1.tbl1")));
+
+ try (MockedStatic<SessionUtils> sessionUtils =
mockStatic(SessionUtils.class)) {
+ sessionUtils
+ .when(() -> SessionUtils.getWithoutCommit(any(), any()))
+ .thenAnswer(
+ invocation -> {
+ Class<?> mapperClass = invocation.getArgument(0);
+ Function<Object, Object> func = invocation.getArgument(1);
+ if (mapperClass == OwnerMetaMapper.class) {
+ return func.apply(ownerMetaMapper);
+ }
+ if (mapperClass == EntityChangeLogMapper.class) {
+ return func.apply(entityChangeLogMapper);
+ }
+ return null;
+ });
+
+ JcasbinChangePoller poller = new JcasbinChangePoller(metadataIdCache,
ownerRelCache, 1);
+ poller.pollChanges();
+ }
+
+ Assertions.assertEquals(List.of(key("ml1", "cat1", "")),
metadataIdCache.invalidatedPrefixes);
+ Assertions.assertEquals(
+ List.of(key("ml1", "cat2", "sch1", "tbl1", "TABLE")),
metadataIdCache.invalidatedKeys);
+ }
+
+ @Test
+ void testPollCursorAdvancementIsSynchronized() throws NoSuchMethodException {
+ Method pollOwnerChanges =
JcasbinChangePoller.class.getDeclaredMethod("pollOwnerChanges");
+ Method pollEntityChanges =
JcasbinChangePoller.class.getDeclaredMethod("pollEntityChanges");
+
+
Assertions.assertTrue(Modifier.isSynchronized(pollOwnerChanges.getModifiers()));
+
Assertions.assertTrue(Modifier.isSynchronized(pollEntityChanges.getModifiers()));
+ }
+
+ private static String key(String... parts) {
+ return String.join(JcasbinAuthorizationLookups.KEY_SEP, parts);
+ }
+
+ private static EntityChangeRecord change(long id, MetadataObject.Type type,
String fullName) {
+ return new EntityChangeRecord(id, "ml1", type.name(), fullName,
OperateType.ALTER, 0L);
+ }
+
+ private static class RecordingCache<K, V> implements GravitinoCache<K, V> {
+ private final List<K> invalidatedKeys = new ArrayList<>();
+ private final List<String> invalidatedPrefixes = new ArrayList<>();
+
+ @Override
+ public Optional<V> getIfPresent(K key) {
+ return Optional.empty();
+ }
+
+ @Override
+ public void put(K key, V value) {}
+
+ @Override
+ public void invalidate(K key) {
+ invalidatedKeys.add(key);
+ }
+
+ @Override
+ public void invalidateAll() {}
+
+ @Override
+ public void invalidateByPrefix(String prefix) {
+ invalidatedPrefixes.add(prefix);
+ }
+
+ @Override
+ public long size() {
+ return 0;
+ }
+
+ @Override
+ public void close() {}
+ }
+}