This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22c5794bc31 KAFKA-19159: Removed time based evictions for share 
sessions (#19500)
22c5794bc31 is described below

commit 22c5794bc314679bfae4d5edcb1909460e4bab7d
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Tue Apr 22 19:29:35 2025 +0530

    KAFKA-19159: Removed time based evictions for share sessions (#19500)
    
    Currently the share session cache is desgined like the fetch session
    cache. If the cache is full and a new share session is trying to get get
    initialized, then the sessions which haven't been touched for more than
    2minutes are evicted. This wouldn't be right for share sessions as the
    members also hold locks on the acquired records, and session eviction
    would mean theose locks will need to be dropped and the corresponding
    records re-delivered. This PR removes the time based eviction logic for
    share sessions.
    
    Refer: [KAFKA-19159](https://issues.apache.org/jira/browse/KAFKA-19159)
    
    Reviewers: Apoorv Mittal <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../kafka/server/share/SharePartitionManager.java  |   7 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 .../server/share/SharePartitionManagerTest.java    | 119 +++------------------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  36 +++----
 .../kafka/server/share/session/LastUsedKey.java    |  66 ------------
 .../kafka/server/share/session/ShareSession.java   |  39 +++----
 .../server/share/session/ShareSessionCache.java    |  62 ++---------
 .../share/session/ShareSessionCacheTest.java       |  88 ++++++---------
 8 files changed, 84 insertions(+), 337 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 1b50a70c18c..a53f846a01c 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -448,8 +448,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                         ImplicitLinkedHashCollection<>(shareFetchData.size());
                 shareFetchData.forEach(topicIdPartition ->
                     cachedSharePartitions.mustAdd(new 
CachedSharePartition(topicIdPartition, false)));
-                ShareSessionKey responseShareSessionKey = 
cache.maybeCreateSession(groupId, reqMetadata.memberId(),
-                        time.milliseconds(), cachedSharePartitions);
+                ShareSessionKey responseShareSessionKey = 
cache.maybeCreateSession(groupId, reqMetadata.memberId(), 
cachedSharePartitions);
                 if (responseShareSessionKey == null) {
                     log.error("Could not create a share session for group {} 
member {}", groupId, reqMetadata.memberId());
                     throw Errors.SHARE_SESSION_NOT_FOUND.exception();
@@ -476,7 +475,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                 }
                 Map<ShareSession.ModifiedTopicIdPartitionType, 
List<TopicIdPartition>> modifiedTopicIdPartitions = shareSession.update(
                     shareFetchData, toForget);
-                cache.touch(shareSession, time.milliseconds());
+                cache.updateNumPartitions(shareSession);
                 shareSession.epoch = 
ShareRequestMetadata.nextEpoch(shareSession.epoch);
                 log.debug("Created a new ShareSessionContext for session key 
{}, epoch {}: " +
                                 "added {}, updated {}, removed {}", 
shareSession.key(), shareSession.epoch,
@@ -517,7 +516,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                             shareSession.epoch, reqMetadata.epoch());
                     throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
                 }
-                cache.touch(shareSession, time.milliseconds());
+                cache.updateNumPartitions(shareSession);
                 shareSession.epoch = 
ShareRequestMetadata.nextEpoch(shareSession.epoch);
             }
         }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8f9b983cb78..cbd2a91fbc5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -427,8 +427,8 @@ class BrokerServer(
       val fetchManager = new FetchManager(Time.SYSTEM, new 
FetchSessionCache(fetchSessionCacheShards))
 
       val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
-        config.shareGroupConfig.shareGroupMaxGroups * 
config.groupCoordinatorConfig.shareGroupMaxSize,
-        KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)
+        config.shareGroupConfig.shareGroupMaxGroups * 
config.groupCoordinatorConfig.shareGroupMaxSize
+      )
 
       sharePartitionManager = new SharePartitionManager(
         replicaManager,
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index a69c6c83071..83601a02201 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -121,7 +121,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -185,7 +184,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testNewContextReturnsFinalContextWithoutRequestData() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -212,7 +211,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testNewContextReturnsFinalContextWithRequestData() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -244,7 +243,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void 
testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequestData() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -275,7 +274,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testNewContext() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -372,100 +371,6 @@ public class SharePartitionManagerTest {
         assertEquals(0, cache.size());
     }
 
-    @Test
-    public void testShareSessionExpiration() {
-        ShareSessionCache cache = new ShareSessionCache(2, 1000);
-        sharePartitionManager = SharePartitionManagerBuilder.builder()
-            .withCache(cache)
-            .withTime(time)
-            .build();
-
-        Map<Uuid, String> topicNames = new HashMap<>();
-        Uuid fooId = Uuid.randomUuid();
-        topicNames.put(fooId, "foo");
-        TopicIdPartition foo0 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 0));
-        TopicIdPartition foo1 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 1));
-
-        // Create a new share session, session 1
-        List<TopicIdPartition> session1req = List.of(foo0, foo1);
-
-        String groupId = "grp";
-        ShareRequestMetadata reqMetadata1 = new 
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
-
-        ShareFetchContext session1context = 
sharePartitionManager.newContext(groupId, session1req, EMPTY_PART_LIST, 
reqMetadata1, false);
-        assertInstanceOf(ShareSessionContext.class, session1context);
-
-        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
respData1 = new LinkedHashMap<>();
-        respData1.put(foo0, new 
ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
-        respData1.put(foo1, new 
ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
-
-        ShareFetchResponse session1resp = 
session1context.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), 
respData1);
-        assertEquals(Errors.NONE, session1resp.error());
-        assertEquals(2, session1resp.responseData(topicNames).size());
-
-        ShareSessionKey session1Key = new ShareSessionKey(groupId, 
reqMetadata1.memberId());
-        // check share session entered into cache
-        assertNotNull(cache.get(session1Key));
-
-        time.sleep(500);
-
-        // Create a second new share session
-        List<TopicIdPartition> session2req = List.of(foo0, foo1);
-
-        ShareRequestMetadata reqMetadata2 = new 
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
-
-        ShareFetchContext session2context = 
sharePartitionManager.newContext(groupId, session2req, EMPTY_PART_LIST, 
reqMetadata2, false);
-        assertInstanceOf(ShareSessionContext.class, session2context);
-
-        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
respData2 = new LinkedHashMap<>();
-        respData2.put(foo0, new 
ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
-        respData2.put(foo1, new 
ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
-
-        ShareFetchResponse session2resp = 
session2context.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), 
respData2);
-        assertEquals(Errors.NONE, session2resp.error());
-        assertEquals(2, session2resp.responseData(topicNames).size());
-
-        ShareSessionKey session2Key = new ShareSessionKey(groupId, 
reqMetadata2.memberId());
-
-        // both newly created entries are present in cache
-        assertNotNull(cache.get(session1Key));
-        assertNotNull(cache.get(session2Key));
-
-        time.sleep(500);
-
-        // Create a subsequent share fetch context for session 1
-        ShareFetchContext session1context2 = 
sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
-            new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
-        assertInstanceOf(ShareSessionContext.class, session1context2);
-
-        // total sleep time will now be large enough that share session 1 will 
be evicted if not correctly touched
-        time.sleep(501);
-
-        // create one final share session to test that the least recently used 
entry is evicted
-        // the second share session should be evicted because the first share 
session was incrementally fetched
-        // more recently than the second session was created
-        List<TopicIdPartition> session3req = List.of(foo0, foo1);
-
-        ShareRequestMetadata reqMetadata3 = new 
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
-
-        ShareFetchContext session3context = 
sharePartitionManager.newContext(groupId, session3req, EMPTY_PART_LIST, 
reqMetadata3, false);
-
-        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
respData3 = new LinkedHashMap<>();
-        respData3.put(foo0, new 
ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
-        respData3.put(foo1, new 
ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
-
-        ShareFetchResponse session3resp = 
session3context.updateAndGenerateResponseData(groupId, reqMetadata3.memberId(), 
respData3);
-        assertEquals(Errors.NONE, session3resp.error());
-        assertEquals(2, session3resp.responseData(topicNames).size());
-
-        ShareSessionKey session3Key = new ShareSessionKey(groupId, 
reqMetadata3.memberId());
-
-        assertNotNull(cache.get(session1Key));
-        assertNull(cache.get(session2Key), "share session 2 should have been 
evicted by latest share session, " +
-            "as share session 1 was used more recently");
-        assertNotNull(cache.get(session3Key));
-    }
-
     @Test
     public void testSubsequentShareSession() {
         sharePartitionManager = SharePartitionManagerBuilder.builder().build();
@@ -530,7 +435,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testZeroSizeShareSession() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -576,7 +481,7 @@ public class SharePartitionManagerTest {
     @Test
     public void testToForgetPartitions() {
         String groupId = "grp";
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -614,7 +519,7 @@ public class SharePartitionManagerTest {
     @Test
     public void testShareSessionUpdateTopicIdsBrokerSide() {
         String groupId = "grp";
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -665,7 +570,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testGetErroneousAndValidTopicIdPartitions() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -758,7 +663,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testShareFetchContextResponseSize() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -859,7 +764,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testCachedTopicPartitionsWithNoTopicPartitions() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -870,7 +775,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testCachedTopicPartitionsForValidShareSessions() {
-        ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        ShareSessionCache cache = new ShareSessionCache(10);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -3108,7 +3013,7 @@ public class SharePartitionManagerTest {
         private final Persister persister = new NoOpStatePersister();
         private ReplicaManager replicaManager = mock(ReplicaManager.class);
         private Time time = new MockTime();
-        private ShareSessionCache cache = new ShareSessionCache(10, 1000);
+        private ShareSessionCache cache = new ShareSessionCache(10);
         private Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
         private Timer timer = new MockTimer();
         private ShareGroupMetrics shareGroupMetrics = new 
ShareGroupMetrics(time);
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 84e1f1ebb67..02541097d4c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4046,9 +4046,8 @@ class KafkaApisTest extends Logging {
 
     when(sharePartitionManager.newContext(any(), any(), any(), any(), 
any())).thenThrow(
       Errors.INVALID_REQUEST.exception()
-    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1), new ShareSession(
-      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2
-    )))
+    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
+      new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions, 2)))
 
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4299,9 +4298,8 @@ class KafkaApisTest extends Logging {
       new TopicIdPartition(topicId, partitionIndex, topicName), false))
 
     when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
-      .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1), new ShareSession(
-        new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
2))
-      )
+      .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1),
+        new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions, 2)))
 
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4361,9 +4359,8 @@ class KafkaApisTest extends Logging {
       new TopicIdPartition(topicId, partitionIndex, topicName), false))
 
     when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
-      .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1), new ShareSession(
-        new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
2))
-      )
+      .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1),
+        new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions, 2)))
 
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4718,10 +4715,10 @@ class KafkaApisTest extends Logging {
       new ShareSessionContext(new ShareRequestMetadata(memberId, 0), 
util.List.of(
         new TopicIdPartition(topicId, partitionIndex, topicName)
       ))
-    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1), new ShareSession(
-      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
2))
-    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
2), new ShareSession(
-      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 10L, 
3))
+    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
+      new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions, 2))
+    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
+      new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions, 3))
     )
 
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
@@ -4986,10 +4983,10 @@ class KafkaApisTest extends Logging {
         new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0)),
         new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
       ))
-    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1), new ShareSession(
-      new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 0L, 0L, 
2))
-    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
2), new ShareSession(
-      new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 0L, 0L, 
3))
+    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
+      new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions1, 2))
+    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
+      new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions2, 3))
     ).thenReturn(new FinalContext())
 
     when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
@@ -5963,9 +5960,8 @@ class KafkaApisTest extends Logging {
       new ShareSessionContext(new ShareRequestMetadata(memberId, 0), 
util.List.of(
         new TopicIdPartition(topicId, partitionIndex, topicName)
       ))
-    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 
1), new ShareSession(
-      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
2))
-    )
+    ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
+      new ShareSession(new ShareSessionKey(groupId, memberId), 
cachedSharePartitions, 2)))
 
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java 
b/server/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
deleted file mode 100644
index e6084f8e9b5..00000000000
--- 
a/server/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.server.share.session;
-
-import java.util.Objects;
-
-public class LastUsedKey implements Comparable<LastUsedKey> {
-    private final ShareSessionKey key;
-    private final long lastUsedMs;
-
-    public LastUsedKey(ShareSessionKey key, long lastUsedMs) {
-        this.key = key;
-        this.lastUsedMs = lastUsedMs;
-    }
-
-    public ShareSessionKey key() {
-        return key;
-    }
-
-    public long lastUsedMs() {
-        return lastUsedMs;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(key, lastUsedMs);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        LastUsedKey other = (LastUsedKey) obj;
-        return lastUsedMs == other.lastUsedMs && Objects.equals(key, 
other.key);
-    }
-
-    @Override
-    public int compareTo(LastUsedKey other) {
-        int res = Long.compare(lastUsedMs, other.lastUsedMs);
-        if (res != 0)
-            return res;
-        return Integer.compare(key.hashCode(), other.key.hashCode());
-    }
-}
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
index 362f32e6197..5cb800c5524 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
@@ -38,9 +38,7 @@ public class ShareSession {
 
     private final ShareSessionKey key;
     private final ImplicitLinkedHashCollection<CachedSharePartition> 
partitionMap;
-    private final long creationMs;
 
-    private long lastUsedMs;
     // visible for testing
     public int epoch;
     // This is used by the ShareSessionCache to store the last known size of 
this session.
@@ -54,17 +52,11 @@ public class ShareSession {
      *
      * @param key                The share session key to identify the share 
session uniquely.
      * @param partitionMap       The CachedPartitionMap.
-     * @param creationMs         The time in milliseconds when this share 
session was created.
-     * @param lastUsedMs         The last used time in milliseconds. This 
should only be updated by
-     *                           ShareSessionCache#touch.
      * @param epoch              The share session sequence number.
      */
-    public ShareSession(ShareSessionKey key, 
ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
-                        long creationMs, long lastUsedMs, int epoch) {
+    public ShareSession(ShareSessionKey key, 
ImplicitLinkedHashCollection<CachedSharePartition> partitionMap, int epoch) {
         this.key = key;
         this.partitionMap = partitionMap;
-        this.creationMs = creationMs;
-        this.lastUsedMs = lastUsedMs;
         this.epoch = epoch;
     }
 
@@ -76,18 +68,6 @@ public class ShareSession {
         return cachedSize;
     }
 
-    public synchronized void cachedSize(int size) {
-        cachedSize = size;
-    }
-
-    public synchronized long lastUsedMs() {
-        return lastUsedMs;
-    }
-
-    public synchronized void lastUsedMs(long ts) {
-        lastUsedMs = ts;
-    }
-
     public synchronized ImplicitLinkedHashCollection<CachedSharePartition> 
partitionMap() {
         return partitionMap;
     }
@@ -105,10 +85,6 @@ public class ShareSession {
         return partitionMap.isEmpty();
     }
 
-    public synchronized LastUsedKey lastUsedKey() {
-        return new LastUsedKey(key, lastUsedMs);
-    }
-
     // Update the cached partition data based on the request.
     public synchronized Map<ModifiedTopicIdPartitionType, 
List<TopicIdPartition>> update(
         List<TopicIdPartition> shareFetchData,
@@ -138,6 +114,17 @@ public class ShareSession {
         return result;
     }
 
+    /**
+     * Updates the cached size of the session to represent the current 
partitionMap size.
+     * @return The difference between the current cached size and the 
previously stored cached size. This is required to
+     *         update the total number of share partitions stored in the share 
session cache.
+     */
+    public synchronized int updateCachedSize() {
+        var previousSize = cachedSize;
+        cachedSize = partitionMap.size();
+        return previousSize != -1 ? cachedSize - previousSize : cachedSize;
+    }
+
     public static String partitionsToLogString(Collection<TopicIdPartition> 
partitions, Boolean traceEnabled) {
         if (traceEnabled) {
             return String.format("( %s )", String.join(", ", 
partitions.toString()));
@@ -149,8 +136,6 @@ public class ShareSession {
         return "ShareSession(" +
                 "key=" + key +
                 ", partitionMap=" + partitionMap +
-                ", creationMs=" + creationMs +
-                ", lastUsedMs=" + lastUsedMs +
                 ", epoch=" + epoch +
                 ", cachedSize=" + cachedSize +
                 ")";
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index dc870fc9c25..0b06ea535be 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -27,7 +27,6 @@ import com.yammer.metrics.core.Meter;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -53,19 +52,14 @@ public class ShareSessionCache {
     private final Meter evictionsMeter;
 
     private final int maxEntries;
-    private final long evictionMs;
     private long numPartitions = 0;
 
     // A map of session key to ShareSession.
     private final Map<ShareSessionKey, ShareSession> sessions = new 
HashMap<>();
 
-    // Maps last used times to sessions.
-    private final TreeMap<LastUsedKey, ShareSession> lastUsed = new 
TreeMap<>();
-
     @SuppressWarnings("this-escape")
-    public ShareSessionCache(int maxEntries, long evictionMs) {
+    public ShareSessionCache(int maxEntries) {
         this.maxEntries = maxEntries;
-        this.evictionMs = evictionMs;
         // Register metrics for ShareSessionCache.
         KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", 
"ShareSessionCache");
         metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
@@ -108,9 +102,6 @@ public class ShareSessionCache {
      * @return The removed session, or None if there was no such session.
      */
     public synchronized ShareSession remove(ShareSession session) {
-        synchronized (session) {
-            lastUsed.remove(session.lastUsedKey());
-        }
         ShareSession removeResult = sessions.remove(session.key());
         if (removeResult != null) {
             numPartitions = numPartitions - session.cachedSize();
@@ -119,64 +110,27 @@ public class ShareSessionCache {
     }
 
     /**
-     * Update a session's position in the lastUsed tree.
+     * Update the size of the cache by updating the total number of share 
partitions.
      *
      * @param session  The session.
-     * @param now      The current time in milliseconds.
      */
-    public synchronized void touch(ShareSession session, long now) {
-        synchronized (session) {
-            // Update the lastUsed map.
-            lastUsed.remove(session.lastUsedKey());
-            session.lastUsedMs(now);
-            lastUsed.put(session.lastUsedKey(), session);
-
-            int oldSize = session.cachedSize();
-            if (oldSize != -1) {
-                numPartitions = numPartitions - oldSize;
-            }
-            session.cachedSize(session.size());
-            numPartitions = numPartitions + session.cachedSize();
-        }
-    }
-
-    /**
-     * Try to evict an entry from the session cache.
-     * <p>
-     * A proposed new element A may evict an existing element B if:
-     * B is considered "stale" because it has been inactive for a long time.
-     *
-     * @param now        The current time in milliseconds.
-     * @return           True if an entry was evicted; false otherwise.
-     */
-    public synchronized boolean tryEvict(long now) {
-        // Try to evict an entry which is stale.
-        Map.Entry<LastUsedKey, ShareSession> lastUsedEntry = 
lastUsed.firstEntry();
-        if (lastUsedEntry == null) {
-            return false;
-        } else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
-            ShareSession session = lastUsedEntry.getValue();
-            remove(session);
-            evictionsMeter.mark();
-            return true;
-        }
-        return false;
+    public synchronized void updateNumPartitions(ShareSession session) {
+        numPartitions += session.updateCachedSize();
     }
 
     /**
      * Maybe create a new session and add it to the cache.
      * @param groupId - The group id in the share fetch request.
      * @param memberId - The member id in the share fetch request.
-     * @param now - The current time in milliseconds.
      * @param partitionMap - The topic partitions to be added to the session.
      * @return - The session key if the session was created, or null if the 
session was not created.
      */
-    public synchronized ShareSessionKey maybeCreateSession(String groupId, 
Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition> 
partitionMap) {
-        if (sessions.size() < maxEntries || tryEvict(now)) {
+    public synchronized ShareSessionKey maybeCreateSession(String groupId, 
Uuid memberId, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap) 
{
+        if (sessions.size() < maxEntries) {
             ShareSession session = new ShareSession(new 
ShareSessionKey(groupId, memberId), partitionMap,
-                    now, now, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
+                
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
             sessions.put(session.key(), session);
-            touch(session, now);
+            updateNumPartitions(session);
             return session.key();
         }
         return null;
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index 4de1ffa4975..ca18de5b65c 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -33,7 +33,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShareSessionCacheTest {
 
@@ -44,46 +43,24 @@ public class ShareSessionCacheTest {
 
     @Test
     public void testShareSessionCache() throws InterruptedException {
-        ShareSessionCache cache = new ShareSessionCache(3, 100);
+        ShareSessionCache cache = new ShareSessionCache(3);
         assertEquals(0, cache.size());
-        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(10));
-        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 10, mockedSharePartitionMap(20));
-        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 20, mockedSharePartitionMap(30));
-        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, 
mockedSharePartitionMap(40)));
-        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, 
mockedSharePartitionMap(5)));
+        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(10));
+        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(20));
+        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(30));
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(40)));
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(5)));
         assertShareCacheContains(cache, List.of(key1, key2, key3));
 
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
-            "Share session count should be 3.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60,
-            "Share partition count should be 60.");
-        assertEquals(0, cache.evictionsMeter().count());
-
-        // Touch the sessions to update the last used time, so that the key-2 
can be evicted.
-        cache.touch(cache.get(key1), 200);
-        ShareSessionKey key4 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 210, mockedSharePartitionMap(11));
-        assertShareCacheContains(cache, List.of(key1, key3, key4));
-
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
-            "Share session count should be 3.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 51,
-            "Share partition count should be 51.");
-        assertEquals(1, cache.evictionsMeter().count());
-        assertTrue(cache.evictionsMeter().meanRate() > 0);
-
-        cache.touch(cache.get(key1), 400);
-        cache.touch(cache.get(key3), 390);
-        cache.touch(cache.get(key4), 400);
-        // No key should be evicted as all the sessions are touched to latest 
time.
-        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 410, 
mockedSharePartitionMap(50)));
+        assertMetricsValues(3, 60, 0, cache);
     }
 
     @Test
     public void testResizeCachedSessions() throws InterruptedException {
-        ShareSessionCache cache = new ShareSessionCache(2, 100);
+        ShareSessionCache cache = new ShareSessionCache(2);
         assertEquals(0, cache.size());
         assertEquals(0, cache.totalPartitions());
-        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
+        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(2));
         assertNotNull(key1);
         assertShareCacheContains(cache, List.of(key1));
         ShareSession session1 = cache.get(key1);
@@ -91,28 +68,20 @@ public class ShareSessionCacheTest {
         assertEquals(2, cache.totalPartitions());
         assertEquals(1, cache.size());
 
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
-            "Share session count should be 1.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 2,
-            "Share partition count should be 2.");
-        assertEquals(0, cache.evictionsMeter().count());
+        assertMetricsValues(1, 2, 0, cache);
 
-        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(4));
+        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(4));
         assertNotNull(key2);
         assertShareCacheContains(cache, List.of(key1, key2));
         ShareSession session2 = cache.get(key2);
         assertEquals(6, cache.totalPartitions());
         assertEquals(2, cache.size());
-        cache.touch(session1, 200);
-        cache.touch(session2, 200);
+        cache.updateNumPartitions(session1);
+        cache.updateNumPartitions(session2);
 
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 2,
-            "Share session count should be 2.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 6,
-            "Share partition count should be 6.");
-        assertEquals(0, cache.evictionsMeter().count());
+        assertMetricsValues(2, 6, 0, cache);
 
-        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 200, mockedSharePartitionMap(5));
+        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(5));
         assertNull(key3);
         assertShareCacheContains(cache, List.of(key1, key2));
         assertEquals(6, cache.totalPartitions());
@@ -122,11 +91,7 @@ public class ShareSessionCacheTest {
         assertEquals(1, cache.size());
         assertEquals(4, cache.totalPartitions());
 
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
-            "Share session count should be 1.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 4,
-            "Share partition count should be 4.");
-        assertEquals(0, cache.evictionsMeter().count());
+        assertMetricsValues(1, 4, 0, cache);
 
         Iterator<CachedSharePartition> iterator = 
session2.partitionMap().iterator();
         iterator.next();
@@ -137,15 +102,11 @@ public class ShareSessionCacheTest {
         assertEquals(4, session2.cachedSize());
         assertEquals(4, cache.totalPartitions());
         // Touch the session to update the changes in cache and session's 
cached size.
-        cache.touch(session2, session2.lastUsedMs());
+        cache.updateNumPartitions(session2);
         assertEquals(3, session2.cachedSize());
         assertEquals(3, cache.totalPartitions());
 
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 1,
-            "Share session count should be 1.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 3,
-            "Share partition count should be 3.");
-        assertEquals(0, cache.evictionsMeter().count());
+        assertMetricsValues(1, 3, 0, cache);
     }
 
     private ImplicitLinkedHashCollection<CachedSharePartition> 
mockedSharePartitionMap(int size) {
@@ -165,4 +126,17 @@ public class ShareSessionCacheTest {
                     "Missing session " + ++i + " out of " + sessionKeys.size() 
+ " ( " + sessionKey + " )");
         }
     }
+
+    private void assertMetricsValues(
+        int shareSessionsCount,
+        int sharePartitionsCount,
+        int evictionsCount,
+        ShareSessionCache cache
+    ) throws InterruptedException {
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 
shareSessionsCount,
+            "Share session count should be " + shareSessionsCount);
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 
sharePartitionsCount,
+            "Share partition count should be " + sharePartitionsCount);
+        assertEquals(evictionsCount, cache.evictionsMeter().count());
+    }
 }


Reply via email to