This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ac35188a4 RATIS-2258. Caching TermIndex objects (#1239)
ac35188a4 is described below
commit ac35188a4fb285842a05224141e3879aaf71e621
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Mar 19 02:30:47 2025 -0700
RATIS-2258. Caching TermIndex objects (#1239)
---
.../org/apache/ratis/util/BiWeakValueCache.java | 144 +++++++++++++++++++++
.../apache/ratis/server/protocol/TermIndex.java | 92 +++++++------
.../ratis/server/protocol/ProtocolTestUtils.java | 26 ++++
.../java/org/apache/ratis/util/TestTermIndex.java | 100 ++++++++++++++
4 files changed, 326 insertions(+), 36 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java
b/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java
new file mode 100644
index 000000000..c1aa6bcd5
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.thirdparty.com.google.common.collect.MapMaker;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+/**
+ * Weak Value Cache: ({@link OUTER}, {@link INNER}) -> {@link T}.
+ * <p>
+ * Note that the cached values are weakly referenced.
+ * A cached value could be garage-collected (i.e. evicted from the cache)
+ * when there are no external (strong) references.
+ *
+ * @param <OUTER> the type of the outer keys.
+ * @param <INNER> the type of the inner keys.
+ * @param <T> the type to be cached.
+ */
+public final class BiWeakValueCache<OUTER, INNER, T> {
+ private static <K, V> ConcurrentMap<K, V> newMap() {
+ return new MapMaker().weakValues().makeMap();
+ }
+
+ private final String outerName;
+ private final String innerName;
+ private final String name;
+
+ /** For constructing {@link T} values from ({@link OUTER}, {@link INNER})
keys. */
+ private final BiFunction<OUTER, INNER, T> constructor;
+ /** Count the number of {@link T} values constructed. */
+ private final AtomicInteger valueCount = new AtomicInteger(0);
+
+ /**
+ * Actual map {@link OUTER} -> ({@link INNER} -> {@link T})
+ * for the logical view ({@link OUTER}, {@link INNER}) -> {@link T}.
+ */
+ private final ConcurrentMap<OUTER, ConcurrentMap<INNER, T>> map = new
ConcurrentHashMap<>();
+
+ /**
+ * Create a cache for mapping ({@link OUTER}, {@link INNER}) keys to {@link
T} values.
+ *
+ * @param outerName the name of the outer long.
+ * @param innerName the name of the inner long.
+ * @param constructor for constructing {@link T} values.
+ */
+ public BiWeakValueCache(String outerName, String innerName,
BiFunction<OUTER, INNER, T> constructor) {
+ this.outerName = outerName;
+ this.innerName = innerName;
+ this.name = "(" + outerName + ", " + innerName + ")-cache";
+ this.constructor = constructor;
+ }
+
+ private T construct(OUTER outer, INNER inner) {
+ final T constructed = constructor.apply(outer, inner);
+ Objects.requireNonNull(constructed, "constructed == null");
+ valueCount.incrementAndGet();
+ return constructed;
+ }
+
+ /**
+ * If the key ({@link OUTER}, {@link INNER}) is in the cache, return the
cached values.
+ * Otherwise, create a new value and then return it.
+ */
+ public T getOrCreate(OUTER outer, INNER inner) {
+ Objects.requireNonNull(outer, () -> outerName + " (outer) == null");
+ Objects.requireNonNull(inner, () -> innerName + " (inner) == null");
+ final ConcurrentMap<INNER, T> innerMap = map.computeIfAbsent(outer, k ->
newMap());
+ final T computed = innerMap.computeIfAbsent(inner, i -> construct(outer,
i));
+ if ((valueCount.get() & 0xFFF) == 0) {
+ cleanupEmptyInnerMaps(); // cleanup empty maps once in a while
+ }
+ return computed;
+ }
+
+ /** @return the value count for the given outer key. */
+ int count(OUTER outer) {
+ final ConcurrentMap<INNER, T> innerMap = map.get(outer);
+ if (innerMap == null) {
+ return 0;
+ }
+
+ // size() may return incorrect result; see Guava MapMaker javadoc
+ int n = 0;
+ for (INNER ignored : innerMap.keySet()) {
+ n++;
+ }
+ return n;
+ }
+
+ void cleanupEmptyInnerMaps() {
+ // isEmpty() may return incorrect result; see Guava MapMaker javadoc
+ map.values().removeIf(e -> !e.entrySet().iterator().hasNext());
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ /** The cache content for debugging. */
+ int dump(Consumer<String> out) {
+ out.accept(name + ":\n");
+ int emptyCount = 0;
+ for (Map.Entry<OUTER, ConcurrentMap<INNER, T>> entry : map.entrySet()) {
+ final OUTER outer = entry.getKey();
+ final ConcurrentMap<INNER, T> innerMap = entry.getValue();
+ final int count = count(outer);
+ if (count == 0) {
+ emptyCount++;
+ }
+
+ out.accept(" " + outerName + ":" + outer);
+ out.accept(", " + innerName + ":" + innerMap.keySet());
+ out.accept(", count=" + count);
+ out.accept(", size=" + innerMap.size());
+ out.accept("\n");
+ }
+ out.accept(" emptyCount=" + emptyCount);
+ out.accept("\n");
+ return emptyCount;
+ }
+}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index dac1a51d2..6115bccad 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.protocol;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.TermIndexProto;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.util.BiWeakValueCache;
import java.util.Comparator;
import java.util.Optional;
@@ -73,43 +74,62 @@ public interface TermIndex extends Comparable<TermIndex> {
/** @return a {@link TermIndex} object. */
static TermIndex valueOf(long term, long index) {
- return new TermIndex() {
- @Override
- public long getTerm() {
- return term;
- }
-
- @Override
- public long getIndex() {
- return index;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- } else if (!(obj instanceof TermIndex)) {
- return false;
+ return Impl.getCache().getOrCreate(term, index);
+ }
+
+ /**
+ * An implementation for private use.
+ * Note that this is not a public API, although this is public class.
+ */
+ final class Impl {
+ private Impl() { }
+
+ private static final BiWeakValueCache<Long, Long, TermIndex> CACHE
+ = new BiWeakValueCache<>("term", "index", Impl::newTermIndex);
+
+ static BiWeakValueCache<Long, Long, TermIndex> getCache() {
+ return CACHE;
+ }
+
+ private static TermIndex newTermIndex(long term, long index) {
+ return new TermIndex() {
+ @Override
+ public long getTerm() {
+ return term;
+ }
+
+ @Override
+ public long getIndex() {
+ return index;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (!(obj instanceof TermIndex)) {
+ return false;
+ }
+
+ final TermIndex that = (TermIndex) obj;
+ return this.getTerm() == that.getTerm()
+ && this.getIndex() == that.getIndex();
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(term) ^ Long.hashCode(index);
+ }
+
+ private String longToString(long n) {
+ return n >= 0L ? String.valueOf(n) : "~";
}
- final TermIndex that = (TermIndex) obj;
- return this.getTerm() == that.getTerm()
- && this.getIndex() == that.getIndex();
- }
-
- @Override
- public int hashCode() {
- return Long.hashCode(term) ^ Long.hashCode(index);
- }
-
- private String longToString(long n) {
- return n >= 0L? String.valueOf(n) : "~";
- }
-
- @Override
- public String toString() {
- return String.format("(t:%s, i:%s)", longToString(term),
longToString(index));
- }
- };
+ @Override
+ public String toString() {
+ return String.format("(t:%s, i:%s)", longToString(term),
longToString(index));
+ }
+ };
+ }
}
}
\ No newline at end of file
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java
b/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java
new file mode 100644
index 000000000..dee3f224c
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.protocol;
+
+import org.apache.ratis.util.BiWeakValueCache;
+
+public interface ProtocolTestUtils {
+ static BiWeakValueCache<Long, Long, TermIndex> getTermIndexCache() {
+ return TermIndex.Impl.getCache();
+ }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java
new file mode 100644
index 000000000..678d7afe6
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.server.protocol.ProtocolTestUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/** Testing {@link BiWeakValueCache}. */
+public class TestTermIndex extends BaseTest {
+ static BiWeakValueCache<Long, Long, TermIndex> CACHE =
ProtocolTestUtils.getTermIndexCache();
+
+ static void dumpCache(Integer expectedEmptyCount) {
+ final int computed = CACHE.dump(System.out::print);
+ if (expectedEmptyCount != null) {
+ assertEquals(expectedEmptyCount, computed);
+ }
+ System.out.flush();
+ }
+
+ static void assertCacheSize(int expectedSize, long term) {
+ final int computed = CACHE.count(term);
+ if (computed != expectedSize) {
+ dumpCache(null);
+ }
+ assertEquals(expectedSize, computed);
+ }
+
+ void assertCacheSizeWithGC(int expectedSize, long term) throws Exception{
+ JavaUtils.attempt(() -> {
+ RaftTestUtil.gc();
+ assertCacheSize(expectedSize, term);
+ }, 5, HUNDRED_MILLIS, "assertCacheSizeWithGC", LOG);
+ }
+
+ static void initTermIndex(TermIndex[][] ti, int term, int index) {
+ ti[term][index] = TermIndex.valueOf(term, index);
+ }
+
+ @Test
+ public void testCaching() throws Exception {
+ final int n = 9;
+ final TermIndex[][] ti = new TermIndex[n][n];
+ final long[] terms = new long[n];
+ final long[] indices = new long[n];
+ for(int j = 0; j < n; j++) {
+ terms[j] = j;
+ indices[j] = j;
+ }
+
+ assertCacheSize(0, terms[1]);
+ initTermIndex(ti, 1, 1);
+ assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
+ assertCacheSize(1, terms[1]);
+
+ initTermIndex(ti, 1, 2);
+ assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
+ assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
+ assertCacheSize(2, terms[1]);
+ dumpCache(0);
+
+ initTermIndex(ti, 2, 2);
+ assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
+ assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
+ assertSame(ti[2][2], TermIndex.valueOf(terms[2], indices[2]));
+ assertCacheSize(2, terms[1]);
+ assertCacheSize(1, terms[2]);
+ dumpCache(0);
+
+ ti[1][1] = null; // release ti[1][1];
+ assertCacheSizeWithGC(1, terms[1]);
+ dumpCache(0);
+
+ ti[1][2] = null; // release ti[1][2];
+ assertCacheSizeWithGC(0, terms[1]);
+ dumpCache(1);
+
+ CACHE.cleanupEmptyInnerMaps();
+ dumpCache(0);
+ }
+}