This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 0484847d792a842de9f34601274cc3abada0990b Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Mar 19 02:30:47 2025 -0700 RATIS-2258. Caching TermIndex objects (#1239) --- .../org/apache/ratis/util/BiWeakValueCache.java | 144 +++++++++++++++++++++ .../apache/ratis/server/protocol/TermIndex.java | 92 +++++++------ .../test/java/org/apache/ratis/RaftTestUtil.java | 15 +++ .../ratis/server/protocol/ProtocolTestUtils.java | 26 ++++ .../java/org/apache/ratis/util/TestTermIndex.java | 100 ++++++++++++++ 5 files changed, 341 insertions(+), 36 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java b/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java new file mode 100644 index 000000000..c1aa6bcd5 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.thirdparty.com.google.common.collect.MapMaker; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +/** + * Weak Value Cache: ({@link OUTER}, {@link INNER}) -> {@link T}. + * <p> + * Note that the cached values are weakly referenced. + * A cached value could be garage-collected (i.e. evicted from the cache) + * when there are no external (strong) references. + * + * @param <OUTER> the type of the outer keys. + * @param <INNER> the type of the inner keys. + * @param <T> the type to be cached. + */ +public final class BiWeakValueCache<OUTER, INNER, T> { + private static <K, V> ConcurrentMap<K, V> newMap() { + return new MapMaker().weakValues().makeMap(); + } + + private final String outerName; + private final String innerName; + private final String name; + + /** For constructing {@link T} values from ({@link OUTER}, {@link INNER}) keys. */ + private final BiFunction<OUTER, INNER, T> constructor; + /** Count the number of {@link T} values constructed. */ + private final AtomicInteger valueCount = new AtomicInteger(0); + + /** + * Actual map {@link OUTER} -> ({@link INNER} -> {@link T}) + * for the logical view ({@link OUTER}, {@link INNER}) -> {@link T}. + */ + private final ConcurrentMap<OUTER, ConcurrentMap<INNER, T>> map = new ConcurrentHashMap<>(); + + /** + * Create a cache for mapping ({@link OUTER}, {@link INNER}) keys to {@link T} values. + * + * @param outerName the name of the outer long. + * @param innerName the name of the inner long. + * @param constructor for constructing {@link T} values. + */ + public BiWeakValueCache(String outerName, String innerName, BiFunction<OUTER, INNER, T> constructor) { + this.outerName = outerName; + this.innerName = innerName; + this.name = "(" + outerName + ", " + innerName + ")-cache"; + this.constructor = constructor; + } + + private T construct(OUTER outer, INNER inner) { + final T constructed = constructor.apply(outer, inner); + Objects.requireNonNull(constructed, "constructed == null"); + valueCount.incrementAndGet(); + return constructed; + } + + /** + * If the key ({@link OUTER}, {@link INNER}) is in the cache, return the cached values. + * Otherwise, create a new value and then return it. + */ + public T getOrCreate(OUTER outer, INNER inner) { + Objects.requireNonNull(outer, () -> outerName + " (outer) == null"); + Objects.requireNonNull(inner, () -> innerName + " (inner) == null"); + final ConcurrentMap<INNER, T> innerMap = map.computeIfAbsent(outer, k -> newMap()); + final T computed = innerMap.computeIfAbsent(inner, i -> construct(outer, i)); + if ((valueCount.get() & 0xFFF) == 0) { + cleanupEmptyInnerMaps(); // cleanup empty maps once in a while + } + return computed; + } + + /** @return the value count for the given outer key. */ + int count(OUTER outer) { + final ConcurrentMap<INNER, T> innerMap = map.get(outer); + if (innerMap == null) { + return 0; + } + + // size() may return incorrect result; see Guava MapMaker javadoc + int n = 0; + for (INNER ignored : innerMap.keySet()) { + n++; + } + return n; + } + + void cleanupEmptyInnerMaps() { + // isEmpty() may return incorrect result; see Guava MapMaker javadoc + map.values().removeIf(e -> !e.entrySet().iterator().hasNext()); + } + + @Override + public String toString() { + return name; + } + + /** The cache content for debugging. */ + int dump(Consumer<String> out) { + out.accept(name + ":\n"); + int emptyCount = 0; + for (Map.Entry<OUTER, ConcurrentMap<INNER, T>> entry : map.entrySet()) { + final OUTER outer = entry.getKey(); + final ConcurrentMap<INNER, T> innerMap = entry.getValue(); + final int count = count(outer); + if (count == 0) { + emptyCount++; + } + + out.accept(" " + outerName + ":" + outer); + out.accept(", " + innerName + ":" + innerMap.keySet()); + out.accept(", count=" + count); + out.accept(", size=" + innerMap.size()); + out.accept("\n"); + } + out.accept(" emptyCount=" + emptyCount); + out.accept("\n"); + return emptyCount; + } +} diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java index dac1a51d2..6115bccad 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.protocol; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.TermIndexProto; import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.util.BiWeakValueCache; import java.util.Comparator; import java.util.Optional; @@ -73,43 +74,62 @@ public interface TermIndex extends Comparable<TermIndex> { /** @return a {@link TermIndex} object. */ static TermIndex valueOf(long term, long index) { - return new TermIndex() { - @Override - public long getTerm() { - return term; - } - - @Override - public long getIndex() { - return index; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } else if (!(obj instanceof TermIndex)) { - return false; + return Impl.getCache().getOrCreate(term, index); + } + + /** + * An implementation for private use. + * Note that this is not a public API, although this is public class. + */ + final class Impl { + private Impl() { } + + private static final BiWeakValueCache<Long, Long, TermIndex> CACHE + = new BiWeakValueCache<>("term", "index", Impl::newTermIndex); + + static BiWeakValueCache<Long, Long, TermIndex> getCache() { + return CACHE; + } + + private static TermIndex newTermIndex(long term, long index) { + return new TermIndex() { + @Override + public long getTerm() { + return term; + } + + @Override + public long getIndex() { + return index; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (!(obj instanceof TermIndex)) { + return false; + } + + final TermIndex that = (TermIndex) obj; + return this.getTerm() == that.getTerm() + && this.getIndex() == that.getIndex(); + } + + @Override + public int hashCode() { + return Long.hashCode(term) ^ Long.hashCode(index); + } + + private String longToString(long n) { + return n >= 0L ? String.valueOf(n) : "~"; } - final TermIndex that = (TermIndex) obj; - return this.getTerm() == that.getTerm() - && this.getIndex() == that.getIndex(); - } - - @Override - public int hashCode() { - return Long.hashCode(term) ^ Long.hashCode(index); - } - - private String longToString(long n) { - return n >= 0L? String.valueOf(n) : "~"; - } - - @Override - public String toString() { - return String.format("(t:%s, i:%s)", longToString(term), longToString(index)); - } - }; + @Override + public String toString() { + return String.format("(t:%s, i:%s)", longToString(term), longToString(index)); + } + }; + } } } \ No newline at end of file diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 3c1b3eda2..206b6dd94 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.ref.WeakReference; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -590,4 +591,18 @@ public interface RaftTestUtil { Assert.assertNotNull("reply == null", reply); Assert.assertTrue("reply is not success: " + reply, reply.isSuccess()); } + + static void gc() throws InterruptedException { + // use WeakReference to detect gc + Object obj = new Object(); + final WeakReference<Object> weakRef = new WeakReference<>(obj); + obj = null; + + // loop until gc has completed. + for (int i = 0; weakRef.get() != null; i++) { + LOG.info("gc {}", i); + System.gc(); + Thread.sleep(100); + } + } } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java new file mode 100644 index 000000000..dee3f224c --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.protocol; + +import org.apache.ratis.util.BiWeakValueCache; + +public interface ProtocolTestUtils { + static BiWeakValueCache<Long, Long, TermIndex> getTermIndexCache() { + return TermIndex.Impl.getCache(); + } +} \ No newline at end of file diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java new file mode 100644 index 000000000..678d7afe6 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.server.protocol.ProtocolTestUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** Testing {@link BiWeakValueCache}. */ +public class TestTermIndex extends BaseTest { + static BiWeakValueCache<Long, Long, TermIndex> CACHE = ProtocolTestUtils.getTermIndexCache(); + + static void dumpCache(Integer expectedEmptyCount) { + final int computed = CACHE.dump(System.out::print); + if (expectedEmptyCount != null) { + assertEquals(expectedEmptyCount, computed); + } + System.out.flush(); + } + + static void assertCacheSize(int expectedSize, long term) { + final int computed = CACHE.count(term); + if (computed != expectedSize) { + dumpCache(null); + } + assertEquals(expectedSize, computed); + } + + void assertCacheSizeWithGC(int expectedSize, long term) throws Exception{ + JavaUtils.attempt(() -> { + RaftTestUtil.gc(); + assertCacheSize(expectedSize, term); + }, 5, HUNDRED_MILLIS, "assertCacheSizeWithGC", LOG); + } + + static void initTermIndex(TermIndex[][] ti, int term, int index) { + ti[term][index] = TermIndex.valueOf(term, index); + } + + @Test + public void testCaching() throws Exception { + final int n = 9; + final TermIndex[][] ti = new TermIndex[n][n]; + final long[] terms = new long[n]; + final long[] indices = new long[n]; + for(int j = 0; j < n; j++) { + terms[j] = j; + indices[j] = j; + } + + assertCacheSize(0, terms[1]); + initTermIndex(ti, 1, 1); + assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1])); + assertCacheSize(1, terms[1]); + + initTermIndex(ti, 1, 2); + assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1])); + assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2])); + assertCacheSize(2, terms[1]); + dumpCache(0); + + initTermIndex(ti, 2, 2); + assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1])); + assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2])); + assertSame(ti[2][2], TermIndex.valueOf(terms[2], indices[2])); + assertCacheSize(2, terms[1]); + assertCacheSize(1, terms[2]); + dumpCache(0); + + ti[1][1] = null; // release ti[1][1]; + assertCacheSizeWithGC(1, terms[1]); + dumpCache(0); + + ti[1][2] = null; // release ti[1][2]; + assertCacheSizeWithGC(0, terms[1]); + dumpCache(1); + + CACHE.cleanupEmptyInnerMaps(); + dumpCache(0); + } +}
