AndrewJSchofield commented on code in PR #19329:
URL: https://github.com/apache/kafka/pull/19329#discussion_r2026942159
##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -885,6 +897,23 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
)
}
+ protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+ request: AbstractRequest,
+ destination: Int
+ )(implicit classTag: ClassTag[T]): T = {
+ val socket = IntegrationTestUtils.connect(brokerSocketServer(destination),
cluster.clientListener())
+ openSockets += socket
+ IntegrationTestUtils.sendAndReceive[T](request, socket)
+ }
+
+ protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+
request: AbstractRequest
Review Comment:
nit: Indentation should match the previous method.
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -171,19 +178,66 @@ public synchronized boolean tryEvict(long now) {
* @param partitionMap - The topic partitions to be added to the session.
* @return - The session key if the session was created, or null if the
session was not created.
Review Comment:
Please add `clientConnectionId` to the javadoc comment.
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -171,19 +178,66 @@ public synchronized boolean tryEvict(long now) {
* @param partitionMap - The topic partitions to be added to the session.
* @return - The session key if the session was created, or null if the
session was not created.
*/
- public synchronized ShareSessionKey maybeCreateSession(String groupId,
Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition>
partitionMap) {
+ public synchronized ShareSessionKey maybeCreateSession(
+ String groupId,
+ Uuid memberId,
+ long now,
+ ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
+ String clientConnectionId
+ ) {
if (sessions.size() < maxEntries || tryEvict(now)) {
ShareSession session = new ShareSession(new
ShareSessionKey(groupId, memberId), partitionMap,
now, now,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
sessions.put(session.key(), session);
touch(session, now);
+ sessionClientIdMapping.add(session.key(), clientConnectionId);
return session.key();
}
return null;
}
+ public ConnectionDisconnectListener connectionDisconnectListener() {
+ return connectionDisconnectListener;
+ }
+
// Visible for testing.
Meter evictionsMeter() {
return evictionsMeter;
}
+
+ private final class ClientConnectionDisconnectListener implements
ConnectionDisconnectListener {
+
+ // When the client disconnect, the corresponding session should be
removed from the cache.
Review Comment:
nit: "disconnects"
##########
server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java:
##########
@@ -148,6 +148,24 @@ public void testResizeCachedSessions() throws
InterruptedException {
assertEquals(0, cache.evictionsMeter().count());
}
+ @Test
+ public void testRemoveConnection() throws InterruptedException {
+ ShareSessionCache cache = new ShareSessionCache(3, 100);
+ assertEquals(0, cache.size());
+ ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 0, mockedSharePartitionMap(10), "conn-1");
+ ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 10, mockedSharePartitionMap(20), "conn-2");
+ ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 20, mockedSharePartitionMap(30), "conn-3");
+
+ // Since cache size is now equal to max entries allowed(3), no new
session can be created.
+ assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30,
mockedSharePartitionMap(40), "conn-4"));
+ assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40,
mockedSharePartitionMap(5), "conn-5"));
+ assertShareCacheContains(cache, List.of(key1, key2, key3));
+
+ // Simulating the disconnection of client with connection id conn-1
+ cache.connectionDisconnectListener().onDisconnect("conn-1");
+ assertShareCacheContains(cache, List.of(key2, key3));
+ }
Review Comment:
And now show that you can add another.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]