This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 0484847d792a842de9f34601274cc3abada0990b
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Mar 19 02:30:47 2025 -0700

    RATIS-2258. Caching TermIndex objects (#1239)
---
 .../org/apache/ratis/util/BiWeakValueCache.java    | 144 +++++++++++++++++++++
 .../apache/ratis/server/protocol/TermIndex.java    |  92 +++++++------
 .../test/java/org/apache/ratis/RaftTestUtil.java   |  15 +++
 .../ratis/server/protocol/ProtocolTestUtils.java   |  26 ++++
 .../java/org/apache/ratis/util/TestTermIndex.java  | 100 ++++++++++++++
 5 files changed, 341 insertions(+), 36 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java 
b/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java
new file mode 100644
index 000000000..c1aa6bcd5
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.thirdparty.com.google.common.collect.MapMaker;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+/**
+ * Weak Value Cache: ({@link OUTER}, {@link INNER}) -> {@link T}.
+ * <p>
+ * Note that the cached values are weakly referenced.
+ * A cached value could be garage-collected (i.e. evicted from the cache)
+ * when there are no external (strong) references.
+ *
+ * @param <OUTER> the type of the outer keys.
+ * @param <INNER> the type of the inner keys.
+ * @param <T> the type to be cached.
+ */
+public final class BiWeakValueCache<OUTER, INNER, T> {
+  private static <K, V> ConcurrentMap<K, V> newMap() {
+    return new MapMaker().weakValues().makeMap();
+  }
+
+  private final String outerName;
+  private final String innerName;
+  private final String name;
+
+  /** For constructing {@link T} values from ({@link OUTER}, {@link INNER}) 
keys. */
+  private final BiFunction<OUTER, INNER, T> constructor;
+  /** Count the number of {@link T} values constructed. */
+  private final AtomicInteger valueCount = new AtomicInteger(0);
+
+  /**
+   * Actual map {@link OUTER} -> ({@link INNER} -> {@link T})
+   * for the logical view ({@link OUTER}, {@link INNER}) -> {@link T}.
+   */
+  private final ConcurrentMap<OUTER, ConcurrentMap<INNER, T>> map = new 
ConcurrentHashMap<>();
+
+  /**
+   * Create a cache for mapping ({@link OUTER}, {@link INNER}) keys to {@link 
T} values.
+   *
+   * @param outerName the name of the outer long.
+   * @param innerName the name of the inner long.
+   * @param constructor for constructing {@link T} values.
+   */
+  public BiWeakValueCache(String outerName, String innerName, 
BiFunction<OUTER, INNER, T> constructor) {
+    this.outerName = outerName;
+    this.innerName = innerName;
+    this.name = "(" + outerName + ", " + innerName + ")-cache";
+    this.constructor = constructor;
+  }
+
+  private T construct(OUTER outer, INNER inner) {
+    final T constructed = constructor.apply(outer, inner);
+    Objects.requireNonNull(constructed, "constructed == null");
+    valueCount.incrementAndGet();
+    return constructed;
+  }
+
+  /**
+   * If the key ({@link OUTER}, {@link INNER}) is in the cache, return the 
cached values.
+   * Otherwise, create a new value and then return it.
+   */
+  public T getOrCreate(OUTER outer, INNER inner) {
+    Objects.requireNonNull(outer, () -> outerName + " (outer) == null");
+    Objects.requireNonNull(inner, () -> innerName + " (inner) == null");
+    final ConcurrentMap<INNER, T> innerMap = map.computeIfAbsent(outer, k -> 
newMap());
+    final T computed = innerMap.computeIfAbsent(inner, i -> construct(outer, 
i));
+    if ((valueCount.get() & 0xFFF) == 0) {
+      cleanupEmptyInnerMaps(); // cleanup empty maps once in a while
+    }
+    return computed;
+  }
+
+  /** @return the value count for the given outer key.  */
+  int count(OUTER outer) {
+    final ConcurrentMap<INNER, T> innerMap = map.get(outer);
+    if (innerMap == null) {
+      return 0;
+    }
+
+    // size() may return incorrect result; see Guava MapMaker javadoc
+    int n = 0;
+    for (INNER ignored : innerMap.keySet()) {
+      n++;
+    }
+    return n;
+  }
+
+  void cleanupEmptyInnerMaps() {
+    // isEmpty() may return incorrect result; see Guava MapMaker javadoc
+    map.values().removeIf(e -> !e.entrySet().iterator().hasNext());
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+
+  /** The cache content for debugging. */
+  int dump(Consumer<String> out) {
+    out.accept(name + ":\n");
+    int emptyCount = 0;
+    for (Map.Entry<OUTER, ConcurrentMap<INNER, T>> entry : map.entrySet()) {
+      final OUTER outer = entry.getKey();
+      final ConcurrentMap<INNER, T> innerMap = entry.getValue();
+      final int count = count(outer);
+      if (count == 0) {
+        emptyCount++;
+      }
+
+      out.accept("  " + outerName + ":" + outer);
+      out.accept(", " + innerName + ":" + innerMap.keySet());
+      out.accept(", count=" + count);
+      out.accept(", size=" + innerMap.size());
+      out.accept("\n");
+    }
+    out.accept("  emptyCount=" + emptyCount);
+    out.accept("\n");
+    return emptyCount;
+  }
+}
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index dac1a51d2..6115bccad 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.protocol;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.TermIndexProto;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.util.BiWeakValueCache;
 
 import java.util.Comparator;
 import java.util.Optional;
@@ -73,43 +74,62 @@ public interface TermIndex extends Comparable<TermIndex> {
 
   /** @return a {@link TermIndex} object. */
   static TermIndex valueOf(long term, long index) {
-    return new TermIndex() {
-      @Override
-      public long getTerm() {
-        return term;
-      }
-
-      @Override
-      public long getIndex() {
-        return index;
-      }
-
-      @Override
-      public boolean equals(Object obj) {
-        if (obj == this) {
-          return true;
-        } else if (!(obj instanceof TermIndex)) {
-          return false;
+    return Impl.getCache().getOrCreate(term, index);
+  }
+
+  /**
+   * An implementation for private use.
+   * Note that this is not a public API, although this is public class.
+   */
+  final class Impl {
+    private Impl() { }
+
+    private static final BiWeakValueCache<Long, Long, TermIndex> CACHE
+        = new BiWeakValueCache<>("term", "index", Impl::newTermIndex);
+
+    static BiWeakValueCache<Long, Long, TermIndex> getCache() {
+      return CACHE;
+    }
+
+    private static TermIndex newTermIndex(long term, long index) {
+      return new TermIndex() {
+        @Override
+        public long getTerm() {
+          return term;
+        }
+
+        @Override
+        public long getIndex() {
+          return index;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+          if (obj == this) {
+            return true;
+          } else if (!(obj instanceof TermIndex)) {
+            return false;
+          }
+
+          final TermIndex that = (TermIndex) obj;
+          return this.getTerm() == that.getTerm()
+              && this.getIndex() == that.getIndex();
+        }
+
+        @Override
+        public int hashCode() {
+          return Long.hashCode(term) ^ Long.hashCode(index);
+        }
+
+        private String longToString(long n) {
+          return n >= 0L ? String.valueOf(n) : "~";
         }
 
-        final TermIndex that = (TermIndex) obj;
-        return this.getTerm() == that.getTerm()
-            && this.getIndex() == that.getIndex();
-      }
-
-      @Override
-      public int hashCode() {
-        return Long.hashCode(term) ^ Long.hashCode(index);
-      }
-
-      private String longToString(long n) {
-        return n >= 0L? String.valueOf(n) : "~";
-      }
-
-      @Override
-      public String toString() {
-        return String.format("(t:%s, i:%s)", longToString(term), 
longToString(index));
-      }
-    };
+        @Override
+        public String toString() {
+          return String.format("(t:%s, i:%s)", longToString(term), 
longToString(index));
+        }
+      };
+    }
   }
 }
\ No newline at end of file
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 3c1b3eda2..206b6dd94 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -590,4 +591,18 @@ public interface RaftTestUtil {
     Assert.assertNotNull("reply == null", reply);
     Assert.assertTrue("reply is not success: " + reply, reply.isSuccess());
   }
+
+  static void gc() throws InterruptedException {
+    // use WeakReference to detect gc
+    Object obj = new Object();
+    final WeakReference<Object> weakRef = new WeakReference<>(obj);
+    obj = null;
+
+    // loop until gc has completed.
+    for (int i = 0; weakRef.get() != null; i++) {
+      LOG.info("gc {}", i);
+      System.gc();
+      Thread.sleep(100);
+    }
+  }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java
new file mode 100644
index 000000000..dee3f224c
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/protocol/ProtocolTestUtils.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.protocol;
+
+import org.apache.ratis.util.BiWeakValueCache;
+
+public interface ProtocolTestUtils {
+  static BiWeakValueCache<Long, Long, TermIndex> getTermIndexCache() {
+    return TermIndex.Impl.getCache();
+  }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java
new file mode 100644
index 000000000..678d7afe6
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.server.protocol.ProtocolTestUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/** Testing {@link BiWeakValueCache}. */
+public class TestTermIndex extends BaseTest {
+  static BiWeakValueCache<Long, Long, TermIndex> CACHE = 
ProtocolTestUtils.getTermIndexCache();
+
+  static void dumpCache(Integer expectedEmptyCount) {
+    final int computed = CACHE.dump(System.out::print);
+    if (expectedEmptyCount != null) {
+      assertEquals(expectedEmptyCount, computed);
+    }
+    System.out.flush();
+  }
+
+  static void assertCacheSize(int expectedSize, long term) {
+    final int computed = CACHE.count(term);
+    if (computed != expectedSize) {
+      dumpCache(null);
+    }
+    assertEquals(expectedSize, computed);
+  }
+
+  void assertCacheSizeWithGC(int expectedSize, long term) throws Exception{
+    JavaUtils.attempt(() -> {
+      RaftTestUtil.gc();
+      assertCacheSize(expectedSize, term);
+    }, 5, HUNDRED_MILLIS, "assertCacheSizeWithGC", LOG);
+  }
+
+  static void initTermIndex(TermIndex[][] ti, int term, int index) {
+    ti[term][index] = TermIndex.valueOf(term, index);
+  }
+
+  @Test
+  public void testCaching() throws Exception {
+    final int n = 9;
+    final TermIndex[][] ti = new TermIndex[n][n];
+    final long[] terms = new long[n];
+    final long[] indices = new long[n];
+    for(int j = 0; j < n; j++) {
+      terms[j] = j;
+      indices[j] = j;
+    }
+
+    assertCacheSize(0, terms[1]);
+    initTermIndex(ti, 1, 1);
+    assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
+    assertCacheSize(1, terms[1]);
+
+    initTermIndex(ti, 1, 2);
+    assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
+    assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
+    assertCacheSize(2, terms[1]);
+    dumpCache(0);
+
+    initTermIndex(ti, 2, 2);
+    assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
+    assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
+    assertSame(ti[2][2], TermIndex.valueOf(terms[2], indices[2]));
+    assertCacheSize(2, terms[1]);
+    assertCacheSize(1, terms[2]);
+    dumpCache(0);
+
+    ti[1][1] = null; // release ti[1][1];
+    assertCacheSizeWithGC(1, terms[1]);
+    dumpCache(0);
+
+    ti[1][2] = null; // release ti[1][2];
+    assertCacheSizeWithGC(0, terms[1]);
+    dumpCache(1);
+
+    CACHE.cleanupEmptyInnerMaps();
+    dumpCache(0);
+  }
+}

Reply via email to