This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new dafd34789d8 [fix][ml] Fix memory leak due to duplicated RangeCache
value retain operations (#23955)
dafd34789d8 is described below
commit dafd34789d8b341339ad869493d097e216ddc472
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Feb 11 02:17:53 2025 +0800
[fix][ml] Fix memory leak due to duplicated RangeCache value retain
operations (#23955)
Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit 20b3b22368b96fcfdd0aa65332b58deb4b518656)
---
.../apache/bookkeeper/mledger/util/RangeCache.java | 96 ++++++++--------------
.../bookkeeper/mledger/util/RangeCacheTest.java | 33 +++++++-
2 files changed, 62 insertions(+), 67 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 0de6f943622..c1de09f10a6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -284,6 +284,9 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
}
}
+ /**
+ * @apiNote the returned value must be released if it's not null
+ */
private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key,
Value>> entry) {
Value valueMatchingEntry =
EntryWrapper.getValueMatchingMapEntry(entry);
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
@@ -291,6 +294,9 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
// validates that the value matches the key and that the value has not
been recycled
// which are possible due to the lack of exclusive locks in the cache and
the use of reference counted objects
+ /**
+ * @apiNote the returned value must be released if it's not null
+ */
private Value getRetainedValueMatchingKey(Key key, Value value) {
if (value == null) {
// the wrapper has been recycled and contains another key
@@ -350,7 +356,7 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
RemovalCounters counters = RemovalCounters.create();
Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first,
true, last, lastInclusive);
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry :
subMap.entrySet()) {
- removeEntry(entry, counters, true);
+ removeEntry(entry, counters);
}
return handleRemovalResult(counters);
}
@@ -361,84 +367,48 @@ public class RangeCache<Key extends Comparable<Key>,
Value extends ValueWithKeyV
BREAK_LOOP;
}
- private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key,
Value>> entry, RemovalCounters counters,
- boolean skipInvalid) {
- return removeEntry(entry, counters, skipInvalid, x -> true);
+ private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key,
Value>> entry, RemovalCounters counters) {
+ return removeEntry(entry, counters, x -> true);
}
private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key,
Value>> entry, RemovalCounters counters,
- boolean skipInvalid,
Predicate<Value> removeCondition) {
+ Predicate<Value> removeCondition) {
Key key = entry.getKey();
EntryWrapper<Key, Value> entryWrapper = entry.getValue();
Value value = getValueMatchingEntry(entry);
if (value == null) {
- // the wrapper has already been recycled and contains another key
- if (!skipInvalid) {
- EntryWrapper<Key, Value> removed = entries.remove(key);
- if (removed != null) {
- // log and remove the entry without releasing the value
- log.info("Key {} does not match the entry's value
wrapper's key {}, removed entry by key without "
- + "releasing the value", key,
entryWrapper.getKey());
- counters.entryRemoved(removed.getSize());
- return RemoveEntryResult.ENTRY_REMOVED;
- }
- }
- return RemoveEntryResult.CONTINUE_LOOP;
- }
- try {
- // add extra retain to avoid value being released while we are
removing it
- value.retain();
- } catch (IllegalReferenceCountException e) {
- // Value was already released
- if (!skipInvalid) {
- // remove the specific entry without releasing the value
- if (entries.remove(key, entryWrapper)) {
- log.info("Value was already released for key {}, removed
entry without releasing the value", key);
- counters.entryRemoved(entryWrapper.getSize());
- return RemoveEntryResult.ENTRY_REMOVED;
- }
- }
+ // the wrapper has already been recycled or contains another key
+ entries.remove(key, entryWrapper);
return RemoveEntryResult.CONTINUE_LOOP;
}
- if (!value.matchesKey(key)) {
- // this is unexpected since the IdentityWrapper.getValue(key)
already checked that the value matches the key
- log.warn("Unexpected race condition. Value {} does not match the
key {}. Removing entry.", value, key);
- }
try {
if (!removeCondition.test(value)) {
return RemoveEntryResult.BREAK_LOOP;
}
- if (!skipInvalid) {
- // remove the specific entry
- boolean entryRemoved = entries.remove(key, entryWrapper);
- if (entryRemoved) {
- counters.entryRemoved(entryWrapper.getSize());
- // check that the value hasn't been recycled in between
- // there should be at least 2 references since this method
adds one and the cache should have
- // one reference. it is valid that the value contains
references even after the key has been
- // removed from the cache
- if (value.refCnt() > 1) {
- entryWrapper.recycle();
- // remove the cache reference
- value.release();
- } else {
- log.info("Unexpected refCnt {} for key {}, removed
entry without releasing the value",
- value.refCnt(), key);
- }
- }
- } else if (skipInvalid && value.refCnt() > 1 &&
entries.remove(key, entryWrapper)) {
- // when skipInvalid is true, we don't remove the entry if it
doesn't match matches the key
- // or the refCnt is invalid
+ // remove the specific entry
+ boolean entryRemoved = entries.remove(key, entryWrapper);
+ if (entryRemoved) {
counters.entryRemoved(entryWrapper.getSize());
- entryWrapper.recycle();
- // remove the cache reference
- value.release();
+ // check that the value hasn't been recycled in between
+ // there should be at least 2 references since this method
adds one and the cache should have
+ // one reference. it is valid that the value contains
references even after the key has been
+ // removed from the cache
+ if (value.refCnt() > 1) {
+ entryWrapper.recycle();
+ // remove the cache reference
+ value.release();
+ } else {
+ log.info("Unexpected refCnt {} for key {}, removed entry
without releasing the value",
+ value.refCnt(), key);
+ }
+ return RemoveEntryResult.ENTRY_REMOVED;
+ } else {
+ return RemoveEntryResult.CONTINUE_LOOP;
}
} finally {
// remove the extra retain
value.release();
}
- return RemoveEntryResult.ENTRY_REMOVED;
}
private Pair<Integer, Long> handleRemovalResult(RemovalCounters counters) {
@@ -464,7 +434,7 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
if (entry == null) {
break;
}
- removeEntry(entry, counters, false);
+ removeEntry(entry, counters);
}
return handleRemovalResult(counters);
}
@@ -484,7 +454,7 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
if (entry == null) {
break;
}
- if (removeEntry(entry, counters, false, value ->
timestampExtractor.getTimestamp(value) <= maxTimestamp)
+ if (removeEntry(entry, counters, value ->
timestampExtractor.getTimestamp(value) <= maxTimestamp)
== RemoveEntryResult.BREAK_LOOP) {
break;
}
@@ -518,7 +488,7 @@ public class RangeCache<Key extends Comparable<Key>, Value
extends ValueWithKeyV
if (entry == null) {
break;
}
- removeEntry(entry, counters, false);
+ removeEntry(entry, counters);
}
return handleRemovalResult(counters);
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index aa13d4b8e34..b6914fd8efe 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -27,13 +27,16 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.Data;
import org.apache.commons.lang3.tuple.Pair;
import org.awaitility.Awaitility;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class RangeCacheTest {
@@ -140,9 +143,14 @@ public class RangeCacheTest {
assertEquals(cache.getNumberOfEntries(), 2);
}
+ @DataProvider
+ public static Object[][] retainBeforeEviction() {
+ return new Object[][]{ { true }, { false } };
+ }
- @Test
- public void customTimeExtraction() {
+
+ @Test(dataProvider = "retainBeforeEviction")
+ public void customTimeExtraction(boolean retain) {
RangeCache<Integer, RefString> cache = new RangeCache<>(value ->
value.s.length(), x -> x.s.length());
cache.put(1, new RefString("1"));
@@ -152,13 +160,30 @@ public class RangeCacheTest {
assertEquals(cache.getSize(), 10);
assertEquals(cache.getNumberOfEntries(), 4);
+ final var retainedEntries = cache.getRange(1, 4444);
+ for (final var entry : retainedEntries) {
+ assertEquals(entry.refCnt(), 2);
+ if (!retain) {
+ entry.release();
+ }
+ }
Pair<Integer, Long> evictedSize =
cache.evictLEntriesBeforeTimestamp(3);
assertEquals(evictedSize.getRight().longValue(), 6);
assertEquals(evictedSize.getLeft().longValue(), 3);
-
assertEquals(cache.getSize(), 4);
assertEquals(cache.getNumberOfEntries(), 1);
+
+ if (retain) {
+ final var valueToRefCnt =
retainedEntries.stream().collect(Collectors.toMap(RefString::getS,
+ AbstractReferenceCounted::refCnt));
+ assertEquals(valueToRefCnt, Map.of("1", 1, "22", 1, "333", 1,
"4444", 2));
+ retainedEntries.forEach(AbstractReferenceCounted::release);
+ } else {
+ final var valueToRefCnt = retainedEntries.stream().filter(v ->
v.refCnt() > 0).collect(Collectors.toMap(
+ RefString::getS, AbstractReferenceCounted::refCnt));
+ assertEquals(valueToRefCnt, Map.of("4444", 1));
+ }
}
@Test
@@ -355,4 +380,4 @@ public class RangeCacheTest {
// the value should be found
assertEquals(s.s, "129");
}
-}
\ No newline at end of file
+}