This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6c53d0c37f KAFKA-18878: Added share session cache and delayed share
fetch metrics (KIP-1103) (#19059)
a6c53d0c37f is described below
commit a6c53d0c37f10d5e69c2b07db8b5445e00a70d24
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Mar 3 16:44:34 2025 +0000
KAFKA-18878: Added share session cache and delayed share fetch metrics
(KIP-1103) (#19059)
The PR implements the ShareSessionCache and DelayedShareFetchMetrics as
defined in KIP-1103.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 19 ++++++
.../kafka/server/share/DelayedShareFetchTest.java | 18 +++++
.../kafka/server/share/SharePartitionTest.java | 17 +----
.../server/share/session/ShareSessionCache.java | 27 ++++++++
.../server/share/fetch/ShareFetchTestUtils.java | 31 +++++++++
.../share/session/ShareSessionCacheTest.java | 79 ++++++++++++++++++----
6 files changed, 162 insertions(+), 29 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index fb5c05a4c75..80399f4e05c 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -36,6 +37,8 @@ import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+import com.yammer.metrics.core.Meter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -61,6 +65,8 @@ public class DelayedShareFetch extends DelayedOperation {
private static final Logger log =
LoggerFactory.getLogger(DelayedShareFetch.class);
+ private static final String EXPIRES_PER_SEC = "ExpiresPerSec";
+
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
@@ -70,6 +76,10 @@ public class DelayedShareFetch extends DelayedOperation {
// The topic partitions that need to be completed for the share fetch
request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of
insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition>
sharePartitions;
+ /**
+ * Metric for the rate of expired delayed fetch requests.
+ */
+ private final Meter expiredRequestMeter;
// Tracks the start time to acquire any share partition for a fetch
request.
private long acquireStartTimeMs;
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
@@ -124,10 +134,14 @@ public class DelayedShareFetch extends DelayedOperation {
this.shareGroupMetrics = shareGroupMetrics;
this.time = time;
this.acquireStartTimeMs = time.hiResClockMs();
+ // Register metrics for DelayedShareFetch.
+ KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server",
"DelayedShareFetchMetrics");
+ this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC,
"requests", TimeUnit.SECONDS);
}
@Override
public void onExpiration() {
+ expiredRequestMeter.mark();
}
/**
@@ -514,4 +528,9 @@ public class DelayedShareFetch extends DelayedOperation {
Lock lock() {
return lock;
}
+
+ // Visible for testing.
+ Meter expiredRequestMeter() {
+ return expiredRequestMeter;
+ }
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 7eb6584bed7..8669474a80c 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -154,6 +154,7 @@ public class DelayedShareFetchTest {
// Metrics shall not be recorded as no partition is acquired.
assertNull(shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId));
assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));
+ assertEquals(0, delayedShareFetch.expiredRequestMeter().count());
delayedShareFetch.lock().unlock();
}
@@ -1118,6 +1119,23 @@ public class DelayedShareFetchTest {
true);
}
+ @Test
+ public void testOnCompleteExecutionOnTimeout() {
+ ShareFetch shareFetch = new ShareFetch(
+ FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), new LinkedHashMap<>(), BATCH_SIZE,
MAX_FETCH_RECORDS,
+ BROKER_TOPIC_STATS);
+ DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
+ .withShareFetchData(shareFetch)
+ .build();
+ assertFalse(delayedShareFetch.isCompleted());
+ assertFalse(shareFetch.isCompleted());
+ // Call run to execute onComplete and onExpiration.
+ delayedShareFetch.run();
+ assertTrue(shareFetch.isCompleted());
+ assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
+ }
+
static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager
replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1,
minBytes);
LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1,
mock(LogOffsetMetadata.class),
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index a2c6e7ed30b..7f2ec081885 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -48,7 +48,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
-import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
@@ -67,8 +66,6 @@ import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
-import com.yammer.metrics.core.Gauge;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -91,6 +88,7 @@ import java.util.concurrent.TimeUnit;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
+import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -6736,19 +6734,6 @@ public class SharePartitionTest {
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
}
- private Number yammerMetricValue(String name) {
- try {
- Gauge gauge = (Gauge)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
- .filter(e -> e.getKey().getMBeanName().contains(name))
- .findFirst()
- .orElseThrow()
- .getValue();
- return (Number) gauge.value();
- } catch (Exception e) {
- return 0;
- }
- }
-
private static class SharePartitionBuilder {
private int defaultAcquisitionLockTimeoutMs = 30000;
diff --git
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index 28df334d0ac..dc870fc9c25 100644
---
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
+++
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -20,11 +20,15 @@ package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.share.CachedSharePartition;
+import com.yammer.metrics.core.Meter;
+
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
/**
* Caches share sessions.
@@ -37,6 +41,17 @@ import java.util.TreeMap;
* must never be acquired while an individual ShareSession lock is already
held.
*/
public class ShareSessionCache {
+ // Visible for testing.
+ static final String SHARE_SESSIONS_COUNT = "ShareSessionsCount";
+ // Visible for testing.
+ static final String SHARE_PARTITIONS_COUNT = "SharePartitionsCount";
+ private static final String SHARE_SESSION_EVICTIONS_PER_SEC =
"ShareSessionEvictionsPerSec";
+
+ /**
+ * Metric for the rate of eviction of share sessions.
+ */
+ private final Meter evictionsMeter;
+
private final int maxEntries;
private final long evictionMs;
private long numPartitions = 0;
@@ -47,9 +62,15 @@ public class ShareSessionCache {
// Maps last used times to sessions.
private final TreeMap<LastUsedKey, ShareSession> lastUsed = new
TreeMap<>();
+ @SuppressWarnings("this-escape")
public ShareSessionCache(int maxEntries, long evictionMs) {
this.maxEntries = maxEntries;
this.evictionMs = evictionMs;
+ // Register metrics for ShareSessionCache.
+ KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server",
"ShareSessionCache");
+ metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
+ metricsGroup.newGauge(SHARE_PARTITIONS_COUNT, this::totalPartitions);
+ this.evictionsMeter =
metricsGroup.newMeter(SHARE_SESSION_EVICTIONS_PER_SEC, "evictions",
TimeUnit.SECONDS);
}
/**
@@ -136,6 +157,7 @@ public class ShareSessionCache {
} else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
ShareSession session = lastUsedEntry.getValue();
remove(session);
+ evictionsMeter.mark();
return true;
}
return false;
@@ -159,4 +181,9 @@ public class ShareSessionCache {
}
return null;
}
+
+ // Visible for testing.
+ Meter evictionsMeter() {
+ return evictionsMeter;
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
index ef3aae2eee7..db7f15ef4c3 100644
---
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
+++
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
@@ -23,8 +23,11 @@ import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestUtils;
+import com.yammer.metrics.core.Gauge;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
@@ -145,4 +148,32 @@ public class ShareFetchTestUtils {
List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() -
acquiredRecords.firstOffset() + 1)
);
}
+
+ /**
+ * Fetch the gauge value from the yammer metrics.
+ *
+ * @param name The name of the metric.
+ * @return The gauge value as a number.
+ */
+ public static Number yammerMetricValue(String name) {
+ try {
+ Gauge gauge = (Gauge)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(e -> e.getKey().getMBeanName().contains(name))
+ .findFirst()
+ .orElseThrow()
+ .getValue();
+ return (Number) gauge.value();
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ /**
+ * Clear all the yammer metrics.
+ */
+ public static void clearYammerMetrics() {
+ KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().forEach(
+ metricName ->
KafkaYammerMetrics.defaultRegistry().removeMetric(metricName)
+ );
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index d5d42c5a3ec..4de1ffa4975 100644
---
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -19,23 +19,31 @@ package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.clearYammerMetrics;
+import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareSessionCacheTest {
+ @BeforeEach
+ public void setUp() {
+ clearYammerMetrics();
+ }
+
@Test
- public void testShareSessionCache() {
+ public void testShareSessionCache() throws InterruptedException {
ShareSessionCache cache = new ShareSessionCache(3, 100);
assertEquals(0, cache.size());
ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 0, mockedSharePartitionMap(10));
@@ -43,56 +51,101 @@ public class ShareSessionCacheTest {
ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 20, mockedSharePartitionMap(30));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30,
mockedSharePartitionMap(40)));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40,
mockedSharePartitionMap(5)));
- assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1,
key2, key3)));
+ assertShareCacheContains(cache, List.of(key1, key2, key3));
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
+ "Share session count should be 3.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60,
+ "Share partition count should be 60.");
+ assertEquals(0, cache.evictionsMeter().count());
+
+ // Touch the sessions to update the last used time, so that the key-2
can be evicted.
cache.touch(cache.get(key1), 200);
ShareSessionKey key4 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 210, mockedSharePartitionMap(11));
- assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1,
key3, key4)));
+ assertShareCacheContains(cache, List.of(key1, key3, key4));
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
+ "Share session count should be 3.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 51,
+ "Share partition count should be 51.");
+ assertEquals(1, cache.evictionsMeter().count());
+ assertTrue(cache.evictionsMeter().meanRate() > 0);
+
cache.touch(cache.get(key1), 400);
cache.touch(cache.get(key3), 390);
cache.touch(cache.get(key4), 400);
- ShareSessionKey key5 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 410, mockedSharePartitionMap(50));
- assertNull(key5);
+ // No key should be evicted as all the sessions are touched to latest
time.
+ assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 410,
mockedSharePartitionMap(50)));
}
@Test
- public void testResizeCachedSessions() {
+ public void testResizeCachedSessions() throws InterruptedException {
ShareSessionCache cache = new ShareSessionCache(2, 100);
assertEquals(0, cache.size());
assertEquals(0, cache.totalPartitions());
ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
assertNotNull(key1);
- assertShareCacheContains(cache, new ArrayList<>(List.of(key1)));
+ assertShareCacheContains(cache, List.of(key1));
ShareSession session1 = cache.get(key1);
assertEquals(2, session1.size());
assertEquals(2, cache.totalPartitions());
assertEquals(1, cache.size());
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
+ "Share session count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 2,
+ "Share partition count should be 2.");
+ assertEquals(0, cache.evictionsMeter().count());
+
ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 0, mockedSharePartitionMap(4));
assertNotNull(key2);
- assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1,
key2)));
+ assertShareCacheContains(cache, List.of(key1, key2));
ShareSession session2 = cache.get(key2);
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.touch(session1, 200);
cache.touch(session2, 200);
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 2,
+ "Share session count should be 2.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 6,
+ "Share partition count should be 6.");
+ assertEquals(0, cache.evictionsMeter().count());
+
ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 200, mockedSharePartitionMap(5));
assertNull(key3);
- assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1,
key2)));
+ assertShareCacheContains(cache, List.of(key1, key2));
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.remove(key1);
- assertShareCacheContains(cache, new ArrayList<>(List.of(key2)));
+ assertShareCacheContains(cache, List.of(key2));
assertEquals(1, cache.size());
assertEquals(4, cache.totalPartitions());
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
+ "Share session count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 4,
+ "Share partition count should be 4.");
+ assertEquals(0, cache.evictionsMeter().count());
+
Iterator<CachedSharePartition> iterator =
session2.partitionMap().iterator();
iterator.next();
iterator.remove();
+ // Session size should get updated as it's backed by the partition map.
assertEquals(3, session2.size());
+ // Cached size should not get updated as it shall update on touch.
assertEquals(4, session2.cachedSize());
+ assertEquals(4, cache.totalPartitions());
+ // Touch the session to update the changes in cache and session's
cached size.
cache.touch(session2, session2.lastUsedMs());
+ assertEquals(3, session2.cachedSize());
assertEquals(3, cache.totalPartitions());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
+ "Share session count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 3,
+ "Share partition count should be 3.");
+ assertEquals(0, cache.evictionsMeter().count());
}
private ImplicitLinkedHashCollection<CachedSharePartition>
mockedSharePartitionMap(int size) {
@@ -104,7 +157,7 @@ public class ShareSessionCacheTest {
}
private void assertShareCacheContains(ShareSessionCache cache,
- ArrayList<ShareSessionKey>
sessionKeys) {
+ List<ShareSessionKey> sessionKeys) {
int i = 0;
assertEquals(sessionKeys.size(), cache.size());
for (ShareSessionKey sessionKey : sessionKeys) {