sunxiaojian commented on code in PR #8452: URL: https://github.com/apache/gravitino/pull/8452#discussion_r2332122368
########## core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java: ########## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.cache; + +import com.google.common.util.concurrent.Striped; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; + +/** + * Segmented lock for improved concurrency. Divides locks into segments to reduce contention. + * Supports global clearing operations that require exclusive access to all segments. + */ +public class SegmentedLock { + private final Striped<Lock> stripedLocks; + private static final Object NULL_KEY = new Object(); + + /** CountDownLatch for global operations - null when no operation is in progress */ + private final AtomicReference<CountDownLatch> globalOperationLatch = new AtomicReference<>(); + + /** + * Creates a SegmentedLock with the specified number of segments. Guava's Striped automatically + * rounds up to the nearest power of 2 for optimal performance. + * + * @param numSegments Number of segments (must be positive) + * @throws IllegalArgumentException if numSegments is not positive + */ + public SegmentedLock(int numSegments) { + if (numSegments <= 0) { + throw new IllegalArgumentException( + "Number of segments must be positive, got: " + numSegments); + } + + this.stripedLocks = Striped.lock(numSegments); + } + + /** + * Gets the segment lock for the given key. + * + * @param key Object to determine the segment + * @return Segment lock for the key + */ + public Lock getSegmentLock(Object key) { + return stripedLocks.get(normalizeKey(key)); + } + + /** + * Normalizes the key to handle null values consistently. + * + * @param key The input key + * @return Normalized key (never null) + */ + private Object normalizeKey(Object key) { + return key != null ? key : NULL_KEY; + } + + /** + * Runs action with segment lock for the given key. Will wait if a global clearing operation is in + * progress. + * + * @param key Key to determine segment + * @param action Action to run + * @throws RuntimeException if interrupted + */ + public void withLock(Object key, Runnable action) { + waitForGlobalComplete(); + Lock lock = getSegmentLock(key); + try { + lock.lockInterruptibly(); + action.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted while waiting for lock", e); + } finally { + lock.unlock(); + } + } + + /** + * Runs action with segment lock and returns result. Will wait if a global clearing operation is + * in progress. + * + * @param key Key to determine segment + * @param action Action to run + * @param <T> Result type + * @return Action result + * @throws RuntimeException if interrupted + */ + public <T> T withLock(Object key, java.util.function.Supplier<T> action) { + waitForGlobalComplete(); + Lock lock = getSegmentLock(key); + try { + lock.lockInterruptibly(); + return action.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted while waiting for lock", e); + } finally { + lock.unlock(); + } + } + + /** + * Runs action with segment lock for the given key. Will wait if a global clearing operation is in + * progress. + * + * @param key Key to determine segment + * @param action Action to run + * @param <T> Result type + * @param <E> Exception type + * @return Action result + * @throws E Exception + */ + public <T, E extends Exception> T withLockAndThrow( + Object key, EntityCache.ThrowingSupplier<T, E> action) throws E { + waitForGlobalComplete(); + Lock lock = getSegmentLock(key); + try { + lock.lockInterruptibly(); + return action.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Thread was interrupted while waiting for lock", e); + } finally { + lock.unlock(); + } + } + + /** + * Runs action with segment lock for the given key. Will wait if a global clearing operation is in + * progress. + * + * @param key Key to determine segment + * @param action Action to run + * @param <E> Exception type + * @throws E Exception + */ + public <E extends Exception> void withLockAndThrow( + Object key, EntityCache.ThrowingRunnable<E> action) throws E { + waitForGlobalComplete(); Review Comment: @yuqi1129 The global lock was intended for `clear()` ########## core/src/main/java/org/apache/gravitino/Configs.java: ########## @@ -408,6 +408,15 @@ private Configs() {} .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) .createWithDefault("caffeine"); + // Number of lock segments for cache concurrency optimization + public static final ConfigEntry<Integer> CACHE_LOCK_SEGMENTS = + new ConfigBuilder("gravitino.cache.lockSegments") + .doc("Number of lock segments for cache concurrency optimization.") Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
