This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a6c53d0c37f KAFKA-18878: Added share session cache and delayed share 
fetch metrics (KIP-1103) (#19059)
a6c53d0c37f is described below

commit a6c53d0c37f10d5e69c2b07db8b5445e00a70d24
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Mar 3 16:44:34 2025 +0000

    KAFKA-18878: Added share session cache and delayed share fetch metrics 
(KIP-1103) (#19059)
    
    The PR implements the ShareSessionCache and DelayedShareFetchMetrics as
    defined in KIP-1103.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java | 19 ++++++
 .../kafka/server/share/DelayedShareFetchTest.java  | 18 +++++
 .../kafka/server/share/SharePartitionTest.java     | 17 +----
 .../server/share/session/ShareSessionCache.java    | 27 ++++++++
 .../server/share/fetch/ShareFetchTestUtils.java    | 31 +++++++++
 .../share/session/ShareSessionCacheTest.java       | 79 ++++++++++++++++++----
 6 files changed, 162 insertions(+), 29 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index fb5c05a4c75..80399f4e05c 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.server.purgatory.DelayedOperation;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -36,6 +37,8 @@ import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
 
+import com.yammer.metrics.core.Meter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -61,6 +65,8 @@ public class DelayedShareFetch extends DelayedOperation {
 
     private static final Logger log = 
LoggerFactory.getLogger(DelayedShareFetch.class);
 
+    private static final String EXPIRES_PER_SEC = "ExpiresPerSec";
+
     private final ShareFetch shareFetch;
     private final ReplicaManager replicaManager;
     private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
@@ -70,6 +76,10 @@ public class DelayedShareFetch extends DelayedOperation {
     // The topic partitions that need to be completed for the share fetch 
request are given by sharePartitions.
     // sharePartitions is a subset of shareFetchData. The order of 
insertion/deletion of entries in sharePartitions is important.
     private final LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions;
+    /**
+     * Metric for the rate of expired delayed fetch requests.
+     */
+    private final Meter expiredRequestMeter;
     // Tracks the start time to acquire any share partition for a fetch 
request.
     private long acquireStartTimeMs;
     private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
@@ -124,10 +134,14 @@ public class DelayedShareFetch extends DelayedOperation {
         this.shareGroupMetrics = shareGroupMetrics;
         this.time = time;
         this.acquireStartTimeMs = time.hiResClockMs();
+        // Register metrics for DelayedShareFetch.
+        KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", 
"DelayedShareFetchMetrics");
+        this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, 
"requests", TimeUnit.SECONDS);
     }
 
     @Override
     public void onExpiration() {
+        expiredRequestMeter.mark();
     }
 
     /**
@@ -514,4 +528,9 @@ public class DelayedShareFetch extends DelayedOperation {
     Lock lock() {
         return lock;
     }
+
+    // Visible for testing.
+    Meter expiredRequestMeter() {
+        return expiredRequestMeter;
+    }
 }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 7eb6584bed7..8669474a80c 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -154,6 +154,7 @@ public class DelayedShareFetchTest {
         // Metrics shall not be recorded as no partition is acquired.
         assertNull(shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId));
         assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));
+        assertEquals(0, delayedShareFetch.expiredRequestMeter().count());
 
         delayedShareFetch.lock().unlock();
     }
@@ -1118,6 +1119,23 @@ public class DelayedShareFetchTest {
             true);
     }
 
+    @Test
+    public void testOnCompleteExecutionOnTimeout() {
+        ShareFetch shareFetch = new ShareFetch(
+            FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), new LinkedHashMap<>(), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .build();
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(shareFetch.isCompleted());
+        // Call run to execute onComplete and onExpiration.
+        delayedShareFetch.run();
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
+    }
+
     static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager 
replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
         LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 
minBytes);
         LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, 
mock(LogOffsetMetadata.class),
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index a2c6e7ed30b..7f2ec081885 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -48,7 +48,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
-import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
@@ -67,8 +66,6 @@ import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.storage.internals.log.OffsetResultHolder;
 import org.apache.kafka.test.TestUtils;
 
-import com.yammer.metrics.core.Gauge;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -91,6 +88,7 @@ import java.util.concurrent.TimeUnit;
 
 import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
+import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -6736,19 +6734,6 @@ public class SharePartitionTest {
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
     }
 
-    private Number yammerMetricValue(String name) {
-        try {
-            Gauge gauge = (Gauge) 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
-                .filter(e -> e.getKey().getMBeanName().contains(name))
-                .findFirst()
-                .orElseThrow()
-                .getValue();
-            return (Number) gauge.value();
-        } catch (Exception e) {
-            return 0;
-        }
-    }
-
     private static class SharePartitionBuilder {
 
         private int defaultAcquisitionLockTimeoutMs = 30000;
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index 28df334d0ac..dc870fc9c25 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -20,11 +20,15 @@ package org.apache.kafka.server.share.session;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.requests.ShareRequestMetadata;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.server.share.CachedSharePartition;
 
+import com.yammer.metrics.core.Meter;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Caches share sessions.
@@ -37,6 +41,17 @@ import java.util.TreeMap;
  * must never be acquired while an individual ShareSession lock is already 
held.
  */
 public class ShareSessionCache {
+    // Visible for testing.
+    static final String SHARE_SESSIONS_COUNT = "ShareSessionsCount";
+    // Visible for testing.
+    static final String SHARE_PARTITIONS_COUNT = "SharePartitionsCount";
+    private static final String SHARE_SESSION_EVICTIONS_PER_SEC = 
"ShareSessionEvictionsPerSec";
+
+    /**
+     * Metric for the rate of eviction of share sessions.
+     */
+    private final Meter evictionsMeter;
+
     private final int maxEntries;
     private final long evictionMs;
     private long numPartitions = 0;
@@ -47,9 +62,15 @@ public class ShareSessionCache {
     // Maps last used times to sessions.
     private final TreeMap<LastUsedKey, ShareSession> lastUsed = new 
TreeMap<>();
 
+    @SuppressWarnings("this-escape")
     public ShareSessionCache(int maxEntries, long evictionMs) {
         this.maxEntries = maxEntries;
         this.evictionMs = evictionMs;
+        // Register metrics for ShareSessionCache.
+        KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", 
"ShareSessionCache");
+        metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
+        metricsGroup.newGauge(SHARE_PARTITIONS_COUNT, this::totalPartitions);
+        this.evictionsMeter = 
metricsGroup.newMeter(SHARE_SESSION_EVICTIONS_PER_SEC, "evictions", 
TimeUnit.SECONDS);
     }
 
     /**
@@ -136,6 +157,7 @@ public class ShareSessionCache {
         } else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
             ShareSession session = lastUsedEntry.getValue();
             remove(session);
+            evictionsMeter.mark();
             return true;
         }
         return false;
@@ -159,4 +181,9 @@ public class ShareSessionCache {
         }
         return null;
     }
+
+    // Visible for testing.
+    Meter evictionsMeter() {
+        return evictionsMeter;
+    }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
 
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
index ef3aae2eee7..db7f15ef4c3 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
@@ -23,8 +23,11 @@ import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.test.TestUtils;
 
+import com.yammer.metrics.core.Gauge;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
@@ -145,4 +148,32 @@ public class ShareFetchTestUtils {
             List.of(acquiredRecords), (int) (acquiredRecords.lastOffset() - 
acquiredRecords.firstOffset() + 1)
         );
     }
+
+    /**
+     * Fetch the gauge value from the yammer metrics.
+     *
+     * @param name The name of the metric.
+     * @return The gauge value as a number.
+     */
+    public static Number yammerMetricValue(String name) {
+        try {
+            Gauge gauge = (Gauge) 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+                .filter(e -> e.getKey().getMBeanName().contains(name))
+                .findFirst()
+                .orElseThrow()
+                .getValue();
+            return (Number) gauge.value();
+        } catch (Exception e) {
+            return 0;
+        }
+    }
+
+    /**
+     * Clear all the yammer metrics.
+     */
+    public static void clearYammerMetrics() {
+        KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().forEach(
+            metricName -> 
KafkaYammerMetrics.defaultRegistry().removeMetric(metricName)
+        );
+    }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index d5d42c5a3ec..4de1ffa4975 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -19,23 +19,31 @@ package org.apache.kafka.server.share.session;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.test.TestUtils;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
+import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.clearYammerMetrics;
+import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShareSessionCacheTest {
 
+    @BeforeEach
+    public void setUp() {
+        clearYammerMetrics();
+    }
+
     @Test
-    public void testShareSessionCache() {
+    public void testShareSessionCache() throws InterruptedException {
         ShareSessionCache cache = new ShareSessionCache(3, 100);
         assertEquals(0, cache.size());
         ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(10));
@@ -43,56 +51,101 @@ public class ShareSessionCacheTest {
         ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 20, mockedSharePartitionMap(30));
         assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, 
mockedSharePartitionMap(40)));
         assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, 
mockedSharePartitionMap(5)));
-        assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, 
key2, key3)));
+        assertShareCacheContains(cache, List.of(key1, key2, key3));
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
+            "Share session count should be 3.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60,
+            "Share partition count should be 60.");
+        assertEquals(0, cache.evictionsMeter().count());
+
+        // Touch the sessions to update the last used time, so that the key-2 
can be evicted.
         cache.touch(cache.get(key1), 200);
         ShareSessionKey key4 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 210, mockedSharePartitionMap(11));
-        assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, 
key3, key4)));
+        assertShareCacheContains(cache, List.of(key1, key3, key4));
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
+            "Share session count should be 3.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 51,
+            "Share partition count should be 51.");
+        assertEquals(1, cache.evictionsMeter().count());
+        assertTrue(cache.evictionsMeter().meanRate() > 0);
+
         cache.touch(cache.get(key1), 400);
         cache.touch(cache.get(key3), 390);
         cache.touch(cache.get(key4), 400);
-        ShareSessionKey key5 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 410, mockedSharePartitionMap(50));
-        assertNull(key5);
+        // No key should be evicted as all the sessions are touched to latest 
time.
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 410, 
mockedSharePartitionMap(50)));
     }
 
     @Test
-    public void testResizeCachedSessions() {
+    public void testResizeCachedSessions() throws InterruptedException {
         ShareSessionCache cache = new ShareSessionCache(2, 100);
         assertEquals(0, cache.size());
         assertEquals(0, cache.totalPartitions());
         ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
         assertNotNull(key1);
-        assertShareCacheContains(cache, new ArrayList<>(List.of(key1)));
+        assertShareCacheContains(cache, List.of(key1));
         ShareSession session1 = cache.get(key1);
         assertEquals(2, session1.size());
         assertEquals(2, cache.totalPartitions());
         assertEquals(1, cache.size());
 
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
+            "Share session count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 2,
+            "Share partition count should be 2.");
+        assertEquals(0, cache.evictionsMeter().count());
+
         ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(4));
         assertNotNull(key2);
-        assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, 
key2)));
+        assertShareCacheContains(cache, List.of(key1, key2));
         ShareSession session2 = cache.get(key2);
         assertEquals(6, cache.totalPartitions());
         assertEquals(2, cache.size());
         cache.touch(session1, 200);
         cache.touch(session2, 200);
 
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 2,
+            "Share session count should be 2.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 6,
+            "Share partition count should be 6.");
+        assertEquals(0, cache.evictionsMeter().count());
+
         ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 200, mockedSharePartitionMap(5));
         assertNull(key3);
-        assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, 
key2)));
+        assertShareCacheContains(cache, List.of(key1, key2));
         assertEquals(6, cache.totalPartitions());
         assertEquals(2, cache.size());
         cache.remove(key1);
-        assertShareCacheContains(cache, new ArrayList<>(List.of(key2)));
+        assertShareCacheContains(cache, List.of(key2));
         assertEquals(1, cache.size());
         assertEquals(4, cache.totalPartitions());
 
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
+            "Share session count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 4,
+            "Share partition count should be 4.");
+        assertEquals(0, cache.evictionsMeter().count());
+
         Iterator<CachedSharePartition> iterator = 
session2.partitionMap().iterator();
         iterator.next();
         iterator.remove();
+        // Session size should get updated as it's backed by the partition map.
         assertEquals(3, session2.size());
+        // Cached size should not get updated as it shall update on touch.
         assertEquals(4, session2.cachedSize());
+        assertEquals(4, cache.totalPartitions());
+        // Touch the session to update the changes in cache and session's 
cached size.
         cache.touch(session2, session2.lastUsedMs());
+        assertEquals(3, session2.cachedSize());
         assertEquals(3, cache.totalPartitions());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
+            "Share session count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 3,
+            "Share partition count should be 3.");
+        assertEquals(0, cache.evictionsMeter().count());
     }
 
     private ImplicitLinkedHashCollection<CachedSharePartition> 
mockedSharePartitionMap(int size) {
@@ -104,7 +157,7 @@ public class ShareSessionCacheTest {
     }
 
     private void assertShareCacheContains(ShareSessionCache cache,
-                                         ArrayList<ShareSessionKey> 
sessionKeys) {
+                                         List<ShareSessionKey> sessionKeys) {
         int i = 0;
         assertEquals(sessionKeys.size(), cache.size());
         for (ShareSessionKey sessionKey : sessionKeys) {

Reply via email to