This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 60bf78138b HDDS-9456. Non-blocking cache for Container State Machine 
(#5446)
60bf78138b is described below

commit 60bf78138b8470d4d513f3e5eaa9df14d0078daa
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu Oct 26 20:59:46 2023 +0530

    HDDS-9456. Non-blocking cache for Container State Machine (#5446)
---
 .../java/org/apache/hadoop/hdds/utils/Cache.java   |   4 +-
 .../apache/hadoop/hdds/utils/ResourceCache.java    |  81 ++++++++++
 .../hadoop/hdds/utils/ResourceLimitCache.java      |  94 ------------
 .../hadoop/hdds/utils/ResourceSemaphore.java       | 170 ---------------------
 ...ourceLimitCache.java => TestResourceCache.java} |  85 ++++-------
 .../hadoop/hdds/utils/TestResourceSemaphore.java   |  72 ---------
 .../common/transport/server/ratis/CSMMetrics.java  |   4 +
 .../server/ratis/ContainerStateMachine.java        |  21 +--
 8 files changed, 124 insertions(+), 407 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
index efeb69f3f9..e22f9f5cfa 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
@@ -26,9 +26,9 @@ public interface Cache<K, V> {
 
   V get(K key);
 
-  V put(K key, V value) throws InterruptedException;
+  void put(K key, V value) throws InterruptedException;
 
-  V remove(K key);
+  void remove(K key);
 
   void removeIf(Predicate<K> predicate);
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceCache.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceCache.java
new file mode 100644
index 0000000000..77296051c1
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.Weigher;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * Cache with FIFO functionality with limit. If resource usages crosses the
+ * limit, first entry will be removed. This does not ensure meeting exact limit
+ * as first entry removal may not meet the limit.
+ */
+public class ResourceCache<K, V> implements Cache<K, V> {
+  private final com.google.common.cache.Cache<K, V> cache;
+
+  public ResourceCache(
+      Weigher<K, V> weigher, long limits,
+      RemovalListener<K, V> listener) {
+    Objects.requireNonNull(weigher);
+    if (listener == null) {
+      cache = CacheBuilder.newBuilder()
+          .maximumWeight(limits).weigher(weigher).build();
+    } else {
+      cache = CacheBuilder.newBuilder()
+          .maximumWeight(limits).weigher(weigher)
+          .removalListener(listener).build();
+    }
+  }
+
+  @Override
+  public V get(K key) {
+    Objects.requireNonNull(key);
+    return cache.getIfPresent(key);
+  }
+
+  @Override
+  public void put(K key, V value) throws InterruptedException {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(value);
+    cache.put(key, value);
+  }
+
+  @Override
+  public void remove(K key) {
+    Objects.requireNonNull(key);
+    cache.invalidate(key);
+  }
+
+  @Override
+  public void removeIf(Predicate<K> predicate) {
+    Objects.requireNonNull(predicate);
+    for (K key : cache.asMap().keySet()) {
+      if (predicate.test(key)) {
+        remove(key);
+      }
+    }
+  }
+
+  @Override
+  public void clear() {
+    cache.invalidateAll();
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
deleted file mode 100644
index 80c539d266..0000000000
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.utils;
-
-import java.util.Objects;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-
-/**
- * Cache with resource limit constraints. At any time all entries in the cache
- * satisfy the resource limit constraints in the constructor. New put
- * operations are blocked until resources are released via remove or clear
- * operation.
- */
-public class ResourceLimitCache<K, V> implements Cache<K, V> {
-  private final java.util.concurrent.ConcurrentMap<K, V> map;
-  private final ResourceSemaphore.Group group;
-  private final BiFunction<K, V, int[]> permitsSupplier;
-
-  public ResourceLimitCache(java.util.concurrent.ConcurrentMap<K, V> map,
-      BiFunction<K, V, int[]> permitsSupplier, int... limits) {
-    Objects.requireNonNull(map);
-    Objects.requireNonNull(permitsSupplier);
-    Objects.requireNonNull(limits);
-    this.map = map;
-    this.group = new ResourceSemaphore.Group(limits);
-    this.permitsSupplier = permitsSupplier;
-  }
-
-  @Override
-  public V get(K key) {
-    Objects.requireNonNull(key);
-    return map.get(key);
-  }
-
-  @Override
-  public V put(K key, V value) throws InterruptedException {
-    Objects.requireNonNull(key);
-    Objects.requireNonNull(value);
-
-    // remove the old key to release the permits
-    V oldVal = remove(key);
-    int[] permits = permitsSupplier.apply(key, value);
-    group.acquire(permits);
-    try {
-      map.put(key, value);
-    } catch (Throwable t) {
-      group.release(permits);
-    }
-    return oldVal;
-  }
-
-  @Override
-  public V remove(K key) {
-    Objects.requireNonNull(key);
-    V val = map.remove(key);
-    if (val != null) {
-      group.release(permitsSupplier.apply(key, val));
-    }
-    return val;
-  }
-
-  @Override
-  public void removeIf(Predicate<K> predicate) {
-    Objects.requireNonNull(predicate);
-    for (K key : map.keySet()) {
-      if (predicate.test(key)) {
-        remove(key);
-      }
-    }
-  }
-
-  @Override
-  public void clear() {
-    for (K key : map.keySet()) {
-      remove(key);
-    }
-  }
-}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
deleted file mode 100644
index e1e959823e..0000000000
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.utils;
-
-
-import org.apache.ratis.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A {@link Semaphore} with a limit for a resource.
- *
- * After {@link #close()}, the resource becomes unavailable,
- * i.e. any acquire will not succeed.
- */
-public class ResourceSemaphore extends Semaphore {
-  private final int limit;
-  private final AtomicBoolean reducePermits = new AtomicBoolean();
-  private final AtomicBoolean isClosed = new AtomicBoolean();
-
-  public ResourceSemaphore(int limit) {
-    super(limit, true);
-    Preconditions.assertTrue(limit > 0, () -> "limit = " + limit + " <= 0");
-    this.limit = limit;
-  }
-
-  @Override
-  public void release() {
-    release(1);
-  }
-
-  @Override
-  public void release(int permits) {
-    assertRelease(permits);
-    super.release(permits);
-    assertAvailable();
-  }
-
-  private void assertRelease(int toRelease) {
-    Preconditions
-        .assertTrue(toRelease >= 0, () -> "toRelease = " + toRelease + " < 0");
-    final int available = assertAvailable();
-    final int permits = Math.addExact(available, toRelease);
-    Preconditions.assertTrue(permits <= limit,
-        () -> "permits = " + permits + " > limit = " + limit);
-  }
-
-  private int assertAvailable() {
-    final int available = availablePermits();
-    Preconditions
-        .assertTrue(available >= 0, () -> "available = " + available + " < 0");
-    return available;
-  }
-
-  public int used() {
-    return limit - availablePermits();
-  }
-
-  /** Close the resource. */
-  public void close() {
-    if (reducePermits.compareAndSet(false, true)) {
-      reducePermits(limit);
-      isClosed.set(true);
-    }
-  }
-
-  public boolean isClosed() {
-    return isClosed.get();
-  }
-
-  @Override
-  public String toString() {
-    return (isClosed() ? "closed/" : availablePermits() + "/") + limit;
-  }
-
-  /**
-   * Track a group of resources with a list of {@link ResourceSemaphore}s.
-   */
-  public static class Group {
-    private final List<ResourceSemaphore> resources;
-
-    public Group(int... limits) {
-      final List<ResourceSemaphore> list = new ArrayList<>(limits.length);
-      for (int limit : limits) {
-        list.add(new ResourceSemaphore(limit));
-      }
-      this.resources = Collections.unmodifiableList(list);
-    }
-
-    int resourceSize() {
-      return resources.size();
-    }
-
-    protected ResourceSemaphore get(int i) {
-      return resources.get(i);
-    }
-
-    boolean tryAcquire(int... permits) {
-      Preconditions.assertTrue(permits.length == resources.size(),
-          () -> "items.length = " + permits.length + " != resources.size() = "
-              + resources.size());
-      int i = 0;
-      // try acquiring all resources
-      for (; i < permits.length; i++) {
-        if (!resources.get(i).tryAcquire(permits[i])) {
-          break;
-        }
-      }
-      if (i == permits.length) {
-        return true; // successfully acquired all resources
-      }
-
-      // failed at i, releasing all previous resources
-      for (i--; i >= 0; i--) {
-        resources.get(i).release(permits[i]);
-      }
-      return false;
-    }
-
-    public void acquire(int... permits) throws InterruptedException {
-      Preconditions.assertTrue(permits.length == resources.size(),
-          () -> "items.length = " + permits.length + " != resources.size() = "
-              + resources.size());
-      for (int i = 0; i < permits.length; i++) {
-        resources.get(i).acquire(permits[i]);
-      }
-    }
-
-    protected void release(int... permits) {
-      for (int i = resources.size() - 1; i >= 0; i--) {
-        resources.get(i).release(permits[i]);
-      }
-    }
-
-    public void close() {
-      for (int i = resources.size() - 1; i >= 0; i--) {
-        resources.get(i).close();
-      }
-    }
-
-    public boolean isClosed() {
-      return resources.get(resources.size() - 1).isClosed();
-    }
-
-    @Override
-    public String toString() {
-      return resources + ",size=" + resources.size();
-    }
-  }
-}
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceCache.java
similarity index 51%
rename from 
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
rename to 
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceCache.java
index 4dd12fe37f..54d59af03d 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceCache.java
@@ -16,82 +16,49 @@
  */
 package org.apache.hadoop.hdds.utils;
 
-import org.apache.ozone.test.GenericTestUtils;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Consumer;
-
 /**
- * Test for ResourceLimitCache.
+ * Test for ResourceCache.
  */
-public class TestResourceLimitCache {
+public class TestResourceCache {
 
   private static final String ANY_VALUE = "asdf";
 
   @Test
-  public void testResourceLimitCache()
-      throws InterruptedException, TimeoutException {
+  public void testResourceCache() throws InterruptedException {
+    AtomicLong count = new AtomicLong(0);
     Cache<Integer, String> resourceCache =
-        new ResourceLimitCache<>(new ConcurrentHashMap<>(),
-            (k, v) -> new int[] {k}, 10);
+        new ResourceCache<>(
+            (k, v) -> (int) k, 10,
+            (P) -> {
+              if (P.wasEvicted()) {
+                count.incrementAndGet();
+              }
+            });
     resourceCache.put(6, "a");
     resourceCache.put(4, "a");
 
     // put should pass as key 4 will be overwritten
     resourceCache.put(4, "a");
 
-    // Create a future which blocks to put 1. Currently map has acquired 10
-    // permits out of 10
-    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
-      try {
-        return resourceCache.put(1, "a");
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      return null;
-    });
-    Assertions.assertFalse(future.isDone());
-    Thread.sleep(100);
-    Assertions.assertFalse(future.isDone());
-
-    // remove 4 so that permits are released for key 1 to be put. Currently map
-    // has acquired 6 permits out of 10
-    resourceCache.remove(4);
+    // put to cache with removing old element "6" as eviction FIFO
+    resourceCache.put(1, "a");
+    Assertions.assertNull(resourceCache.get(6));
+    Assertions.assertTrue(count.get() == 1);
+
+    // add 5 should be success with no removal
+    resourceCache.put(5, "a");
+    Assertions.assertNotNull(resourceCache.get(4));
 
-    GenericTestUtils.waitFor(future::isDone, 100, 1000);
-    // map has the key 1
-    Assertions.assertTrue(future.isDone());
-    Assertions.assertFalse(future.isCompletedExceptionally());
-    Assertions.assertNotNull(resourceCache.get(1));
-
-    // Create a future which blocks to put 4. Currently map has acquired 7
-    // permits out of 10
-    ExecutorService pool = Executors.newCachedThreadPool();
-    future = CompletableFuture.supplyAsync(() -> {
-      try {
-        return resourceCache.put(4, "a");
-      } catch (InterruptedException e) {
-        return null;
-      }
-    }, pool);
-    Assertions.assertFalse(future.isDone());
-    Thread.sleep(100);
-    Assertions.assertFalse(future.isDone());
-
-    // Shutdown the thread pool for putting key 4
-    pool.shutdownNow();
-    // Mark the future as cancelled
-    future.cancel(true);
-    // remove key 1 so currently map has acquired 6 permits out of 10
-    resourceCache.remove(1);
+    // remove and check queue
+    resourceCache.remove(4);
     Assertions.assertNull(resourceCache.get(4));
+    Assertions.assertTrue(count.get() == 1);
   }
 
   @Test
@@ -118,8 +85,8 @@ public class TestResourceLimitCache {
     // GIVEN
     final int maxSize = 3;
     Cache<Integer, String> resourceCache =
-        new ResourceLimitCache<>(new ConcurrentHashMap<>(),
-            (k, v) -> new int[] {1}, maxSize);
+        new ResourceCache<>(
+            (k, v) -> 1, maxSize, null);
     for (int i = 1; i <= maxSize; ++i) {
       resourceCache.put(i, ANY_VALUE);
     }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
deleted file mode 100644
index 43e74e687f..0000000000
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.utils;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/**
- * Test for ResourceSemaphore.
- */
-public class TestResourceSemaphore {
-  @Test
-  @Timeout(1)
-  public void testGroup() {
-    final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
-
-    assertUsed(g, 0, 0);
-    assertAcquire(g, true, 1, 1);
-    assertUsed(g, 1, 1);
-    assertAcquire(g, false, 1, 1);
-    assertUsed(g, 1, 1);
-    assertAcquire(g, false, 0, 1);
-    assertUsed(g, 1, 1);
-    assertAcquire(g, true, 1, 0);
-    assertUsed(g, 2, 1);
-    assertAcquire(g, true, 1, 0);
-    assertUsed(g, 3, 1);
-    assertAcquire(g, false, 1, 0);
-    assertUsed(g, 3, 1);
-
-    g.release(1, 1);
-    assertUsed(g, 2, 0);
-    g.release(2, 0);
-    assertUsed(g, 0, 0);
-    g.release(0, 0);
-    assertUsed(g, 0, 0);
-
-    assertThrows(IllegalStateException.class, () -> g.release(1, 0));
-    assertThrows(IllegalStateException.class, () -> g.release(0, 1));
-  }
-
-  static void assertUsed(ResourceSemaphore.Group g, int... expected) {
-    Assertions.assertEquals(expected.length, g.resourceSize());
-    for (int i = 0; i < expected.length; i++) {
-      Assertions.assertEquals(expected[i], g.get(i).used());
-    }
-  }
-
-  static void assertAcquire(ResourceSemaphore.Group g, boolean expected,
-      int... permits) {
-    final boolean computed = g.tryAcquire(permits);
-    Assertions.assertEquals(expected, computed);
-  }
-}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
index d5c4f7ba0e..b776dc903d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
@@ -61,6 +61,7 @@ public class CSMMetrics {
   private @Metric MutableCounterLong numContainerNotOpenVerifyFailures;
   private @Metric MutableCounterLong numDataCacheMiss;
   private @Metric MutableCounterLong numDataCacheHit;
+  private @Metric MutableCounterLong numEvictedCacheCount;
 
   private @Metric MutableRate applyTransactionNs;
   private @Metric MutableRate writeStateMachineDataNs;
@@ -224,6 +225,9 @@ public class CSMMetrics {
   public void incNumDataCacheHit() {
     numDataCacheHit.incr();
   }
+  public void incNumEvictedCacheCount() {
+    numEvictedCacheCount.incr();
+  }
 
   public void unRegister() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 0758b245fb..ae32df6f45 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -42,7 +42,6 @@ import java.util.stream.Collectors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Container2BCSIDMapProto;
@@ -57,7 +56,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.Cache;
-import org.apache.hadoop.hdds.utils.ResourceLimitCache;
+import org.apache.hadoop.hdds.utils.ResourceCache;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -197,18 +196,20 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     metrics = CSMMetrics.create(gid);
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     applyTransactionCompletionMap = new ConcurrentHashMap<>();
-    int numPendingRequests = conf
-        .getObject(DatanodeRatisServerConfig.class)
-        .getLeaderNumPendingRequests();
     long pendingRequestsBytesLimit = (long)conf.getStorageSize(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT,
         StorageUnit.BYTES);
-    int pendingRequestsMegaBytesLimit =
-        HddsUtils.roundupMb(pendingRequestsBytesLimit);
-    stateMachineDataCache = new ResourceLimitCache<>(new ConcurrentHashMap<>(),
-        (index, data) -> new int[] {1, HddsUtils.roundupMb(data.size())},
-        numPendingRequests, pendingRequestsMegaBytesLimit);
+    // cache with FIFO eviction, and if element not found, this needs
+    // to be obtained from disk for slow follower
+    stateMachineDataCache = new ResourceCache<>(
+        (index, data) -> ((ByteString)data).size(),
+        pendingRequestsBytesLimit,
+        (p) -> {
+          if (p.wasEvicted()) {
+            metrics.incNumEvictedCacheCount();
+          }
+        });
 
     this.chunkExecutors = chunkExecutors;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to