This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 60bf78138b HDDS-9456. Non-blocking cache for Container State Machine
(#5446)
60bf78138b is described below
commit 60bf78138b8470d4d513f3e5eaa9df14d0078daa
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu Oct 26 20:59:46 2023 +0530
HDDS-9456. Non-blocking cache for Container State Machine (#5446)
---
.../java/org/apache/hadoop/hdds/utils/Cache.java | 4 +-
.../apache/hadoop/hdds/utils/ResourceCache.java | 81 ++++++++++
.../hadoop/hdds/utils/ResourceLimitCache.java | 94 ------------
.../hadoop/hdds/utils/ResourceSemaphore.java | 170 ---------------------
...ourceLimitCache.java => TestResourceCache.java} | 85 ++++-------
.../hadoop/hdds/utils/TestResourceSemaphore.java | 72 ---------
.../common/transport/server/ratis/CSMMetrics.java | 4 +
.../server/ratis/ContainerStateMachine.java | 21 +--
8 files changed, 124 insertions(+), 407 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
index efeb69f3f9..e22f9f5cfa 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
@@ -26,9 +26,9 @@ public interface Cache<K, V> {
V get(K key);
- V put(K key, V value) throws InterruptedException;
+ void put(K key, V value) throws InterruptedException;
- V remove(K key);
+ void remove(K key);
void removeIf(Predicate<K> predicate);
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceCache.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceCache.java
new file mode 100644
index 0000000000..77296051c1
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.Weigher;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * Cache with FIFO functionality with limit. If resource usages crosses the
+ * limit, first entry will be removed. This does not ensure meeting exact limit
+ * as first entry removal may not meet the limit.
+ */
+public class ResourceCache<K, V> implements Cache<K, V> {
+ private final com.google.common.cache.Cache<K, V> cache;
+
+ public ResourceCache(
+ Weigher<K, V> weigher, long limits,
+ RemovalListener<K, V> listener) {
+ Objects.requireNonNull(weigher);
+ if (listener == null) {
+ cache = CacheBuilder.newBuilder()
+ .maximumWeight(limits).weigher(weigher).build();
+ } else {
+ cache = CacheBuilder.newBuilder()
+ .maximumWeight(limits).weigher(weigher)
+ .removalListener(listener).build();
+ }
+ }
+
+ @Override
+ public V get(K key) {
+ Objects.requireNonNull(key);
+ return cache.getIfPresent(key);
+ }
+
+ @Override
+ public void put(K key, V value) throws InterruptedException {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(value);
+ cache.put(key, value);
+ }
+
+ @Override
+ public void remove(K key) {
+ Objects.requireNonNull(key);
+ cache.invalidate(key);
+ }
+
+ @Override
+ public void removeIf(Predicate<K> predicate) {
+ Objects.requireNonNull(predicate);
+ for (K key : cache.asMap().keySet()) {
+ if (predicate.test(key)) {
+ remove(key);
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ cache.invalidateAll();
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
deleted file mode 100644
index 80c539d266..0000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.utils;
-
-import java.util.Objects;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-
-/**
- * Cache with resource limit constraints. At any time all entries in the cache
- * satisfy the resource limit constraints in the constructor. New put
- * operations are blocked until resources are released via remove or clear
- * operation.
- */
-public class ResourceLimitCache<K, V> implements Cache<K, V> {
- private final java.util.concurrent.ConcurrentMap<K, V> map;
- private final ResourceSemaphore.Group group;
- private final BiFunction<K, V, int[]> permitsSupplier;
-
- public ResourceLimitCache(java.util.concurrent.ConcurrentMap<K, V> map,
- BiFunction<K, V, int[]> permitsSupplier, int... limits) {
- Objects.requireNonNull(map);
- Objects.requireNonNull(permitsSupplier);
- Objects.requireNonNull(limits);
- this.map = map;
- this.group = new ResourceSemaphore.Group(limits);
- this.permitsSupplier = permitsSupplier;
- }
-
- @Override
- public V get(K key) {
- Objects.requireNonNull(key);
- return map.get(key);
- }
-
- @Override
- public V put(K key, V value) throws InterruptedException {
- Objects.requireNonNull(key);
- Objects.requireNonNull(value);
-
- // remove the old key to release the permits
- V oldVal = remove(key);
- int[] permits = permitsSupplier.apply(key, value);
- group.acquire(permits);
- try {
- map.put(key, value);
- } catch (Throwable t) {
- group.release(permits);
- }
- return oldVal;
- }
-
- @Override
- public V remove(K key) {
- Objects.requireNonNull(key);
- V val = map.remove(key);
- if (val != null) {
- group.release(permitsSupplier.apply(key, val));
- }
- return val;
- }
-
- @Override
- public void removeIf(Predicate<K> predicate) {
- Objects.requireNonNull(predicate);
- for (K key : map.keySet()) {
- if (predicate.test(key)) {
- remove(key);
- }
- }
- }
-
- @Override
- public void clear() {
- for (K key : map.keySet()) {
- remove(key);
- }
- }
-}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
deleted file mode 100644
index e1e959823e..0000000000
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.utils;
-
-
-import org.apache.ratis.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A {@link Semaphore} with a limit for a resource.
- *
- * After {@link #close()}, the resource becomes unavailable,
- * i.e. any acquire will not succeed.
- */
-public class ResourceSemaphore extends Semaphore {
- private final int limit;
- private final AtomicBoolean reducePermits = new AtomicBoolean();
- private final AtomicBoolean isClosed = new AtomicBoolean();
-
- public ResourceSemaphore(int limit) {
- super(limit, true);
- Preconditions.assertTrue(limit > 0, () -> "limit = " + limit + " <= 0");
- this.limit = limit;
- }
-
- @Override
- public void release() {
- release(1);
- }
-
- @Override
- public void release(int permits) {
- assertRelease(permits);
- super.release(permits);
- assertAvailable();
- }
-
- private void assertRelease(int toRelease) {
- Preconditions
- .assertTrue(toRelease >= 0, () -> "toRelease = " + toRelease + " < 0");
- final int available = assertAvailable();
- final int permits = Math.addExact(available, toRelease);
- Preconditions.assertTrue(permits <= limit,
- () -> "permits = " + permits + " > limit = " + limit);
- }
-
- private int assertAvailable() {
- final int available = availablePermits();
- Preconditions
- .assertTrue(available >= 0, () -> "available = " + available + " < 0");
- return available;
- }
-
- public int used() {
- return limit - availablePermits();
- }
-
- /** Close the resource. */
- public void close() {
- if (reducePermits.compareAndSet(false, true)) {
- reducePermits(limit);
- isClosed.set(true);
- }
- }
-
- public boolean isClosed() {
- return isClosed.get();
- }
-
- @Override
- public String toString() {
- return (isClosed() ? "closed/" : availablePermits() + "/") + limit;
- }
-
- /**
- * Track a group of resources with a list of {@link ResourceSemaphore}s.
- */
- public static class Group {
- private final List<ResourceSemaphore> resources;
-
- public Group(int... limits) {
- final List<ResourceSemaphore> list = new ArrayList<>(limits.length);
- for (int limit : limits) {
- list.add(new ResourceSemaphore(limit));
- }
- this.resources = Collections.unmodifiableList(list);
- }
-
- int resourceSize() {
- return resources.size();
- }
-
- protected ResourceSemaphore get(int i) {
- return resources.get(i);
- }
-
- boolean tryAcquire(int... permits) {
- Preconditions.assertTrue(permits.length == resources.size(),
- () -> "items.length = " + permits.length + " != resources.size() = "
- + resources.size());
- int i = 0;
- // try acquiring all resources
- for (; i < permits.length; i++) {
- if (!resources.get(i).tryAcquire(permits[i])) {
- break;
- }
- }
- if (i == permits.length) {
- return true; // successfully acquired all resources
- }
-
- // failed at i, releasing all previous resources
- for (i--; i >= 0; i--) {
- resources.get(i).release(permits[i]);
- }
- return false;
- }
-
- public void acquire(int... permits) throws InterruptedException {
- Preconditions.assertTrue(permits.length == resources.size(),
- () -> "items.length = " + permits.length + " != resources.size() = "
- + resources.size());
- for (int i = 0; i < permits.length; i++) {
- resources.get(i).acquire(permits[i]);
- }
- }
-
- protected void release(int... permits) {
- for (int i = resources.size() - 1; i >= 0; i--) {
- resources.get(i).release(permits[i]);
- }
- }
-
- public void close() {
- for (int i = resources.size() - 1; i >= 0; i--) {
- resources.get(i).close();
- }
- }
-
- public boolean isClosed() {
- return resources.get(resources.size() - 1).isClosed();
- }
-
- @Override
- public String toString() {
- return resources + ",size=" + resources.size();
- }
- }
-}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceCache.java
similarity index 51%
rename from
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
rename to
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceCache.java
index 4dd12fe37f..54d59af03d 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceCache.java
@@ -16,82 +16,49 @@
*/
package org.apache.hadoop.hdds.utils;
-import org.apache.ozone.test.GenericTestUtils;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Consumer;
-
/**
- * Test for ResourceLimitCache.
+ * Test for ResourceCache.
*/
-public class TestResourceLimitCache {
+public class TestResourceCache {
private static final String ANY_VALUE = "asdf";
@Test
- public void testResourceLimitCache()
- throws InterruptedException, TimeoutException {
+ public void testResourceCache() throws InterruptedException {
+ AtomicLong count = new AtomicLong(0);
Cache<Integer, String> resourceCache =
- new ResourceLimitCache<>(new ConcurrentHashMap<>(),
- (k, v) -> new int[] {k}, 10);
+ new ResourceCache<>(
+ (k, v) -> (int) k, 10,
+ (P) -> {
+ if (P.wasEvicted()) {
+ count.incrementAndGet();
+ }
+ });
resourceCache.put(6, "a");
resourceCache.put(4, "a");
// put should pass as key 4 will be overwritten
resourceCache.put(4, "a");
- // Create a future which blocks to put 1. Currently map has acquired 10
- // permits out of 10
- CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
- try {
- return resourceCache.put(1, "a");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return null;
- });
- Assertions.assertFalse(future.isDone());
- Thread.sleep(100);
- Assertions.assertFalse(future.isDone());
-
- // remove 4 so that permits are released for key 1 to be put. Currently map
- // has acquired 6 permits out of 10
- resourceCache.remove(4);
+ // put to cache with removing old element "6" as eviction FIFO
+ resourceCache.put(1, "a");
+ Assertions.assertNull(resourceCache.get(6));
+ Assertions.assertTrue(count.get() == 1);
+
+ // add 5 should be success with no removal
+ resourceCache.put(5, "a");
+ Assertions.assertNotNull(resourceCache.get(4));
- GenericTestUtils.waitFor(future::isDone, 100, 1000);
- // map has the key 1
- Assertions.assertTrue(future.isDone());
- Assertions.assertFalse(future.isCompletedExceptionally());
- Assertions.assertNotNull(resourceCache.get(1));
-
- // Create a future which blocks to put 4. Currently map has acquired 7
- // permits out of 10
- ExecutorService pool = Executors.newCachedThreadPool();
- future = CompletableFuture.supplyAsync(() -> {
- try {
- return resourceCache.put(4, "a");
- } catch (InterruptedException e) {
- return null;
- }
- }, pool);
- Assertions.assertFalse(future.isDone());
- Thread.sleep(100);
- Assertions.assertFalse(future.isDone());
-
- // Shutdown the thread pool for putting key 4
- pool.shutdownNow();
- // Mark the future as cancelled
- future.cancel(true);
- // remove key 1 so currently map has acquired 6 permits out of 10
- resourceCache.remove(1);
+ // remove and check queue
+ resourceCache.remove(4);
Assertions.assertNull(resourceCache.get(4));
+ Assertions.assertTrue(count.get() == 1);
}
@Test
@@ -118,8 +85,8 @@ public class TestResourceLimitCache {
// GIVEN
final int maxSize = 3;
Cache<Integer, String> resourceCache =
- new ResourceLimitCache<>(new ConcurrentHashMap<>(),
- (k, v) -> new int[] {1}, maxSize);
+ new ResourceCache<>(
+ (k, v) -> 1, maxSize, null);
for (int i = 1; i <= maxSize; ++i) {
resourceCache.put(i, ANY_VALUE);
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
deleted file mode 100644
index 43e74e687f..0000000000
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.utils;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/**
- * Test for ResourceSemaphore.
- */
-public class TestResourceSemaphore {
- @Test
- @Timeout(1)
- public void testGroup() {
- final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
-
- assertUsed(g, 0, 0);
- assertAcquire(g, true, 1, 1);
- assertUsed(g, 1, 1);
- assertAcquire(g, false, 1, 1);
- assertUsed(g, 1, 1);
- assertAcquire(g, false, 0, 1);
- assertUsed(g, 1, 1);
- assertAcquire(g, true, 1, 0);
- assertUsed(g, 2, 1);
- assertAcquire(g, true, 1, 0);
- assertUsed(g, 3, 1);
- assertAcquire(g, false, 1, 0);
- assertUsed(g, 3, 1);
-
- g.release(1, 1);
- assertUsed(g, 2, 0);
- g.release(2, 0);
- assertUsed(g, 0, 0);
- g.release(0, 0);
- assertUsed(g, 0, 0);
-
- assertThrows(IllegalStateException.class, () -> g.release(1, 0));
- assertThrows(IllegalStateException.class, () -> g.release(0, 1));
- }
-
- static void assertUsed(ResourceSemaphore.Group g, int... expected) {
- Assertions.assertEquals(expected.length, g.resourceSize());
- for (int i = 0; i < expected.length; i++) {
- Assertions.assertEquals(expected[i], g.get(i).used());
- }
- }
-
- static void assertAcquire(ResourceSemaphore.Group g, boolean expected,
- int... permits) {
- final boolean computed = g.tryAcquire(permits);
- Assertions.assertEquals(expected, computed);
- }
-}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
index d5c4f7ba0e..b776dc903d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
@@ -61,6 +61,7 @@ public class CSMMetrics {
private @Metric MutableCounterLong numContainerNotOpenVerifyFailures;
private @Metric MutableCounterLong numDataCacheMiss;
private @Metric MutableCounterLong numDataCacheHit;
+ private @Metric MutableCounterLong numEvictedCacheCount;
private @Metric MutableRate applyTransactionNs;
private @Metric MutableRate writeStateMachineDataNs;
@@ -224,6 +225,9 @@ public class CSMMetrics {
public void incNumDataCacheHit() {
numDataCacheHit.incr();
}
+ public void incNumEvictedCacheCount() {
+ numEvictedCacheCount.incr();
+ }
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 0758b245fb..ae32df6f45 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -42,7 +42,6 @@ import java.util.stream.Collectors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Container2BCSIDMapProto;
@@ -57,7 +56,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.Cache;
-import org.apache.hadoop.hdds.utils.ResourceLimitCache;
+import org.apache.hadoop.hdds.utils.ResourceCache;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -197,18 +196,20 @@ public class ContainerStateMachine extends
BaseStateMachine {
metrics = CSMMetrics.create(gid);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
applyTransactionCompletionMap = new ConcurrentHashMap<>();
- int numPendingRequests = conf
- .getObject(DatanodeRatisServerConfig.class)
- .getLeaderNumPendingRequests();
long pendingRequestsBytesLimit = (long)conf.getStorageSize(
OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT,
StorageUnit.BYTES);
- int pendingRequestsMegaBytesLimit =
- HddsUtils.roundupMb(pendingRequestsBytesLimit);
- stateMachineDataCache = new ResourceLimitCache<>(new ConcurrentHashMap<>(),
- (index, data) -> new int[] {1, HddsUtils.roundupMb(data.size())},
- numPendingRequests, pendingRequestsMegaBytesLimit);
+ // cache with FIFO eviction, and if element not found, this needs
+ // to be obtained from disk for slow follower
+ stateMachineDataCache = new ResourceCache<>(
+ (index, data) -> ((ByteString)data).size(),
+ pendingRequestsBytesLimit,
+ (p) -> {
+ if (p.wasEvicted()) {
+ metrics.incNumEvictedCacheCount();
+ }
+ });
this.chunkExecutors = chunkExecutors;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]