This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 800612e4a7a KAFKA-19015: Remove share session from cache on share
consumer connection drop (#19329)
800612e4a7a is described below
commit 800612e4a7ae5511d20816de73ae6cea0596edfc
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Thu May 1 19:06:18 2025 +0530
KAFKA-19015: Remove share session from cache on share consumer connection
drop (#19329)
Up till now, the share sessions in the broker were only attempted to
evict when the share session cache was full and a new session was trying
to get registered. With the changes in this PR, whenever a share
consumer gets disconnected from the broker, the corresponding share
session would be evicted from the cache.
Note - `connectAndReceiveWithoutClosingSocket` has been introduced in
`GroupCoordinatorBaseRequestTest`. This method creates a socket
connection, sends the request, receives a response but does not close
the connection. Instead, these sockets are stored in a ListBuffer
`openSockets`, which are closed in tearDown method after each test is
run. Also, all the `connectAndReceive` calls in
`ShareFetchAcknowledgeRequestTest` have been replaced by
`connectAndReceiveWithoutClosingSocket`, because these tests depends
upon the persistence of the share sessions on the broker once
registered. But, with the new code introduced, as soon as the socket
connection is closed, a connection drop is assumed by the broker,
leading to session eviction.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 52 ++++++
.../kafka/server/share/SharePartitionManager.java | 16 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 10 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../server/share/SharePartitionManagerTest.java | 93 +++++-----
.../server/GroupCoordinatorBaseRequestTest.scala | 29 +++
.../scala/unit/kafka/server/KafkaApisTest.scala | 62 ++++---
.../server/ShareFetchAcknowledgeRequestTest.scala | 200 +++++++++++++++------
.../server/ShareGroupHeartbeatRequestTest.scala | 74 ++++++++
.../server/share/session/ShareSessionCache.java | 35 +++-
.../share/session/ShareSessionCacheTest.java | 46 ++++-
11 files changed, 467 insertions(+), 152 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 4aa08032463..51ec647fd7c 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
@@ -2057,6 +2058,57 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest(
+ brokers = 1,
+ serverProperties = {
+ @ClusterConfigProperty(key = "auto.create.topics.enable", value =
"false"),
+ @ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+ @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+ @ClusterConfigProperty(key =
"group.share.partition.max.record.locks", value = "10000"),
+ @ClusterConfigProperty(key =
"group.share.record.lock.duration.ms", value = "15000"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.min.isr", value = "1"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "transaction.state.log.min.isr",
value = "1"),
+ @ClusterConfigProperty(key =
"transaction.state.log.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "group.share.max.size", value = "3")
// Setting max group size to 3
+ }
+ )
+ public void testShareGroupMaxSizeConfigExceeded() throws Exception {
+ // creating 3 consumers in the group1
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumer3 =
createShareConsumer("group1");
+
+ shareConsumer1.subscribe(Set.of(tp.topic()));
+ shareConsumer2.subscribe(Set.of(tp.topic()));
+ shareConsumer3.subscribe(Set.of(tp.topic()));
+
+ shareConsumer1.poll(Duration.ofMillis(5000));
+ shareConsumer2.poll(Duration.ofMillis(5000));
+ shareConsumer3.poll(Duration.ofMillis(5000));
+
+ ShareConsumer<byte[], byte[]> shareConsumer4 =
createShareConsumer("group1");
+ shareConsumer4.subscribe(Set.of(tp.topic()));
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ shareConsumer4.poll(Duration.ofMillis(5000));
+ } catch (GroupMaxSizeReachedException e) {
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ return false;
+ }, 30000, 200L, () -> "The 4th consumer was not kicked out of the
group");
+
+ shareConsumer1.close();
+ shareConsumer2.close();
+ shareConsumer3.close();
+ shareConsumer4.close();
+ }
+
@ClusterTest
public void testReadCommittedIsolationLevel() {
alterShareAutoOffsetReset("group1", "earliest");
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index a53f846a01c..44af40ec8f8 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -420,11 +420,18 @@ public class SharePartitionManager implements
AutoCloseable {
* @param shareFetchData The topic-partitions in the share fetch request.
* @param toForget The topic-partitions to forget present in the share
fetch request.
* @param reqMetadata The metadata in the share fetch request.
- * @param isAcknowledgeDataPresent This tells whether the fetch request
received includes piggybacked acknowledgements or not
+ * @param isAcknowledgeDataPresent This tells whether the fetch request
received includes piggybacked acknowledgements or not.
+ * @param clientConnectionId The client connection id.
* @return The new share fetch context object
*/
- public ShareFetchContext newContext(String groupId, List<TopicIdPartition>
shareFetchData,
- List<TopicIdPartition> toForget,
ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
+ public ShareFetchContext newContext(
+ String groupId,
+ List<TopicIdPartition> shareFetchData,
+ List<TopicIdPartition> toForget,
+ ShareRequestMetadata reqMetadata,
+ Boolean isAcknowledgeDataPresent,
+ String clientConnectionId
+ ) {
ShareFetchContext context;
// If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should
remove the existing sessions. Also, start a
// new session in case it is INITIAL_EPOCH. Hence, we need to treat
them as special cases.
@@ -448,7 +455,8 @@ public class SharePartitionManager implements AutoCloseable
{
ImplicitLinkedHashCollection<>(shareFetchData.size());
shareFetchData.forEach(topicIdPartition ->
cachedSharePartitions.mustAdd(new
CachedSharePartition(topicIdPartition, false)));
- ShareSessionKey responseShareSessionKey =
cache.maybeCreateSession(groupId, reqMetadata.memberId(),
cachedSharePartitions);
+ ShareSessionKey responseShareSessionKey =
cache.maybeCreateSession(groupId, reqMetadata.memberId(),
+ cachedSharePartitions, clientConnectionId);
if (responseShareSessionKey == null) {
log.error("Could not create a share session for group {}
member {}", groupId, reqMetadata.memberId());
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 62d01d89b90..d219e6461cd 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -259,7 +259,13 @@ class BrokerServer(
Optional.of(clientMetricsManager)
)
- val connectionDisconnectListeners =
Seq(clientMetricsManager.connectionDisconnectListener())
+ val shareFetchSessionCache : ShareSessionCache = new
ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
+
+ val connectionDisconnectListeners = Seq(
+ clientMetricsManager.connectionDisconnectListener(),
+ shareFetchSessionCache.connectionDisconnectListener()
+ )
+
// Create and start the socket server acceptor threads so that the bound
port is known.
// Delay starting processors until the end of the initialization
sequence to ensure
// that credentials have been loaded before processing authentications.
@@ -426,8 +432,6 @@ class BrokerServer(
))
val fetchManager = new FetchManager(Time.SYSTEM, new
FetchSessionCache(fetchSessionCacheShards))
- val shareFetchSessionCache : ShareSessionCache = new
ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
-
sharePartitionManager = new SharePartitionManager(
replicaManager,
time,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e6dda8626b8..1e5180ae8db 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3041,7 +3041,7 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
// Creating the shareFetchContext for Share Session Handling. if context
creation fails, the request is failed directly here.
- shareFetchContext = sharePartitionManager.newContext(groupId,
shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent)
+ shareFetchContext = sharePartitionManager.newContext(groupId,
shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent,
request.context.connectionId)
} catch {
case e: Exception =>
requestHelper.sendMaybeThrottle(request,
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index a45724dd1e7..0b3f8e4828d 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -153,6 +153,7 @@ public class SharePartitionManagerTest {
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
private static final String TIMER_NAME_PREFIX = "share-partition-manager";
+ private static final String CONNECTION_ID = "id-1";
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
@@ -200,12 +201,12 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.FINAL_EPOCH);
- ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
List.of(), List.of(), reqMetadata2, true);
+ ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
List.of(), List.of(), reqMetadata2, true, CONNECTION_ID);
assertEquals(FinalContext.class, context2.getClass());
}
@@ -217,7 +218,6 @@ public class SharePartitionManagerTest {
.build();
Uuid tpId0 = Uuid.randomUuid();
- Uuid tpId1 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new
TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new
TopicPartition("foo", 1));
@@ -228,16 +228,15 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.FINAL_EPOCH);
- // shareFetch is not empty, but the maxBytes of topic partition is 0,
which means this is added only for acknowledgements.
- // New context should be created successfully
- List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1,
new TopicPartition("foo", 0)));
- ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData3, List.of(), reqMetadata2, true);
+ // Sending a Request with FINAL_EPOCH. This should return a
FinalContext.
+ List<TopicIdPartition> reqData2 = List.of(tp0, tp1);
+ ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, true, CONNECTION_ID);
assertEquals(FinalContext.class, context2.getClass());
}
@@ -260,16 +259,16 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId,
ShareRequestMetadata.FINAL_EPOCH);
// shareFetch is not empty, and it contains tpId1, which should return
FinalContext instance since it is FINAL_EPOCH
- List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1,
new TopicPartition("foo", 0)));
+ List<TopicIdPartition> reqData2 = List.of(new TopicIdPartition(tpId1,
new TopicPartition("foo", 0)));
assertInstanceOf(FinalContext.class,
- sharePartitionManager.newContext(groupId, reqData3, List.of(),
reqMetadata2, true));
+ sharePartitionManager.newContext(groupId, reqData2, List.of(),
reqMetadata2, true, CONNECTION_ID));
}
@Test
@@ -295,7 +294,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData2 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata2 = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false);
+ ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
@@ -314,16 +313,16 @@ public class SharePartitionManagerTest {
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true,
"id-2"));
// Test trying to create a new session with a non-existent session key
Uuid memberId4 = Uuid.randomUuid();
assertThrows(ShareSessionNotFoundException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(memberId4, 1), true));
+ new ShareRequestMetadata(memberId4, 1), true, "id-3"));
// Continue the first share session we created.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true,
CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context5);
assertTrue(((ShareSessionContext) context5).isSubsequent());
@@ -341,18 +340,18 @@ public class SharePartitionManagerTest {
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true,
CONNECTION_ID));
// Test generating a throttled response for a subsequent share session
ShareFetchContext context7 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true,
CONNECTION_ID);
ShareFetchResponse resp7 = context7.throttleResponse(100);
assertEquals(Errors.NONE, resp7.error());
assertEquals(100, resp7.throttleTimeMs());
// Get the final share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(reqMetadata2.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true);
+ new ShareRequestMetadata(reqMetadata2.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context8.getClass());
assertEquals(1, cache.size());
@@ -389,7 +388,7 @@ public class SharePartitionManagerTest {
String groupId = "grp";
ShareRequestMetadata reqMetadata1 = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
respData1 = new LinkedHashMap<>();
@@ -405,7 +404,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> removed2 = new ArrayList<>();
removed2.add(tp0);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, removed2,
- new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
+ new ShareRequestMetadata(reqMetadata1.memberId(), 1), true,
CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
Set<TopicIdPartition> expectedTopicIdPartitions2 = new HashSet<>();
@@ -452,7 +451,7 @@ public class SharePartitionManagerTest {
String groupId = "grp";
ShareRequestMetadata reqMetadata1 = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
respData1 = new LinkedHashMap<>();
@@ -469,7 +468,7 @@ public class SharePartitionManagerTest {
removed2.add(foo0);
removed2.add(foo1);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
List.of(), removed2,
- new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
+ new ShareRequestMetadata(reqMetadata1.memberId(), 1), true,
CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
respData2 = new LinkedHashMap<>();
@@ -495,14 +494,14 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(foo, bar);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertPartitionsPresent((ShareSessionContext) context1, List.of(foo,
bar));
mockUpdateAndGenerateResponseData(context1, groupId,
reqMetadata1.memberId());
ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
List.of(), List.of(foo),
- new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
+ new ShareRequestMetadata(reqMetadata1.memberId(), 1), true,
CONNECTION_ID);
// So foo is removed but not the others.
assertPartitionsPresent((ShareSessionContext) context2, List.of(bar));
@@ -510,7 +509,7 @@ public class SharePartitionManagerTest {
mockUpdateAndGenerateResponseData(context2, groupId,
reqMetadata1.memberId());
ShareFetchContext context3 = sharePartitionManager.newContext(groupId,
List.of(), List.of(bar),
- new ShareRequestMetadata(reqMetadata1.memberId(), 2), true);
+ new ShareRequestMetadata(reqMetadata1.memberId(), 2), true,
CONNECTION_ID);
assertPartitionsPresent((ShareSessionContext) context3, List.of());
}
@@ -537,7 +536,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(foo, bar);
ShareRequestMetadata reqMetadata1 = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
@@ -553,7 +552,7 @@ public class SharePartitionManagerTest {
// Create a subsequent share fetch request as though no topics changed.
ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
+ new ShareRequestMetadata(reqMetadata1.memberId(), 1), true,
CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertTrue(((ShareSessionContext) context2).isSubsequent());
@@ -587,7 +586,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData2 = List.of(tp0, tp1, tpNull1);
ShareRequestMetadata reqMetadata2 = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false);
+ ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
assertErroneousAndValidTopicIdPartitions(context2.getErroneousAndValidTopicIdPartitions(),
List.of(tpNull1), List.of(tp0, tp1));
@@ -609,15 +608,15 @@ public class SharePartitionManagerTest {
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true));
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true, CONNECTION_ID));
// Test trying to create a new session with a non-existent session key
assertThrows(ShareSessionNotFoundException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(Uuid.randomUuid(), 1), true));
+ new ShareRequestMetadata(Uuid.randomUuid(), 1), true,
CONNECTION_ID));
// Continue the first share session we created.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 1),
true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 1),
true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context5);
assertTrue(((ShareSessionContext) context5).isSubsequent());
@@ -628,12 +627,12 @@ public class SharePartitionManagerTest {
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true));
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true, CONNECTION_ID));
// Test generating a throttled response for a subsequent share session
List<TopicIdPartition> reqData7 = List.of(tpNull2);
ShareFetchContext context7 = sharePartitionManager.newContext(groupId,
reqData7, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 2),
true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 2),
true, CONNECTION_ID);
// Check for throttled response
ShareFetchResponse resp7 = context7.throttleResponse(100);
assertEquals(Errors.NONE, resp7.error());
@@ -643,7 +642,7 @@ public class SharePartitionManagerTest {
// Get the final share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(reqMetadata2.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true);
+ new ShareRequestMetadata(reqMetadata2.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context8.getClass());
assertEquals(1, cache.size());
@@ -688,7 +687,7 @@ public class SharePartitionManagerTest {
short version = ApiKeys.SHARE_FETCH.latestVersion();
ShareRequestMetadata reqMetadata2 = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false);
+ ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
@@ -708,17 +707,17 @@ public class SharePartitionManagerTest {
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true));
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true, CONNECTION_ID));
// Test trying to create a new session with a non-existent session key
Uuid memberId4 = Uuid.randomUuid();
assertThrows(ShareSessionNotFoundException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(memberId4, 1), true));
+ new ShareRequestMetadata(memberId4, 1), true, CONNECTION_ID));
// Continue the first share session we created.
List<TopicIdPartition> reqData5 = List.of(tp2);
ShareFetchContext context5 = sharePartitionManager.newContext(groupId,
reqData5, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 1),
true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 1),
true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context5);
assertTrue(((ShareSessionContext) context5).isSubsequent());
@@ -733,11 +732,11 @@ public class SharePartitionManagerTest {
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () ->
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true));
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 5),
true, CONNECTION_ID));
// Test generating a throttled response for a subsequent share session
ShareFetchContext context7 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey2.memberId(), 2),
true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 2),
true, CONNECTION_ID);
int respSize7 = context7.responseSize(respData2, version);
ShareFetchResponse resp7 = context7.throttleResponse(100);
@@ -748,7 +747,7 @@ public class SharePartitionManagerTest {
// Get the final share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(reqMetadata2.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true);
+ new ShareRequestMetadata(reqMetadata2.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context8.getClass());
assertEquals(1, cache.size());
@@ -794,7 +793,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new
ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
+ ShareFetchContext context1 = sharePartitionManager.newContext(groupId,
reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
@@ -815,7 +814,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData2 = List.of(tp2);
ShareRequestMetadata reqMetadata2 = new
ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH);
- ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false);
+ ShareFetchContext context2 = sharePartitionManager.newContext(groupId,
reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
@@ -833,7 +832,7 @@ public class SharePartitionManagerTest {
// Continue the first share session we created.
List<TopicIdPartition> reqData3 = List.of(tp2);
ShareFetchContext context3 = sharePartitionManager.newContext(groupId,
reqData3, EMPTY_PART_LIST,
- new ShareRequestMetadata(shareSessionKey1.memberId(), 1),
true);
+ new ShareRequestMetadata(shareSessionKey1.memberId(), 1),
true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context3);
assertTrue(((ShareSessionContext) context3).isSubsequent());
@@ -848,7 +847,7 @@ public class SharePartitionManagerTest {
// Continue the second session we created.
List<TopicIdPartition> reqData4 = List.of(tp3);
ShareFetchContext context4 = sharePartitionManager.newContext(groupId,
reqData4, List.of(tp2),
- new ShareRequestMetadata(shareSessionKey2.memberId(), 1),
true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 1),
true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context4);
assertTrue(((ShareSessionContext) context4).isSubsequent());
@@ -861,7 +860,7 @@ public class SharePartitionManagerTest {
// Get the final share session.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId,
List.of(), EMPTY_PART_LIST,
- new ShareRequestMetadata(reqMetadata1.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true);
+ new ShareRequestMetadata(reqMetadata1.memberId(),
ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context5.getClass());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
respData5 = new LinkedHashMap<>();
@@ -876,7 +875,7 @@ public class SharePartitionManagerTest {
// Continue the second share session .
ShareFetchContext context6 = sharePartitionManager.newContext(groupId,
List.of(), List.of(tp3),
- new ShareRequestMetadata(shareSessionKey2.memberId(), 2),
true);
+ new ShareRequestMetadata(shareSessionKey2.memberId(), 2),
true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context6);
assertTrue(((ShareSessionContext) context6).isSubsequent());
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index df939c29ffb..e7926df3e36 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -33,9 +33,11 @@ import org.apache.kafka.common.utils.ProducerIdAndEpoch
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import java.net.Socket
import java.util.{Comparator, Properties}
import java.util.stream.Collectors
import scala.collection.Seq
+import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@@ -46,6 +48,8 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
protected var producer: KafkaProducer[String, String] = _
+ protected var openSockets: ListBuffer[Socket] = ListBuffer[Socket]()
+
protected def createOffsetsTopic(): Unit = {
val admin = cluster.admin()
try {
@@ -140,6 +144,14 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
}
+ protected def closeSockets(): Unit = {
+ while (openSockets.nonEmpty) {
+ val socket = openSockets.head
+ socket.close()
+ openSockets.remove(0)
+ }
+ }
+
protected def closeProducer(): Unit = {
if(producer != null)
producer.close()
@@ -922,6 +934,23 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
)
}
+ protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+ request: AbstractRequest,
+ destination: Int
+ )(implicit classTag: ClassTag[T]): T = {
+ val socket = IntegrationTestUtils.connect(brokerSocketServer(destination),
cluster.clientListener())
+ openSockets += socket
+ IntegrationTestUtils.sendAndReceive[T](request, socket)
+ }
+
+ protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+ request: AbstractRequest
+ )(implicit classTag: ClassTag[T]): T = {
+ val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(),
cluster.clientListener())
+ openSockets += socket
+ IntegrationTestUtils.sendAndReceive[T](request, socket)
+ }
+
private def brokerSocketServer(brokerId: Int): SocketServer = {
getBrokers.find { broker =>
broker.config.brokerId == brokerId
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 093ef4943f9..18c1d7b3b6c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4188,7 +4188,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId,
shareSessionEpoch), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)))
)
@@ -4257,10 +4257,11 @@ class KafkaApisTest extends Logging {
cachedSharePartitions.mustAdd(new CachedSharePartition(
new TopicIdPartition(topicId, partitionIndex, topicName), false))
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenThrow(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenThrow(
Errors.INVALID_REQUEST.exception()
- ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions, 2)))
+ ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2
+ )))
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4351,7 +4352,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@@ -4436,7 +4437,7 @@ class KafkaApisTest extends Logging {
FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@@ -4501,9 +4502,10 @@ class KafkaApisTest extends Logging {
cachedSharePartitions.mustAdd(new CachedSharePartition(
new TopicIdPartition(topicId, partitionIndex, topicName), false))
- when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
- .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions, 2)))
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any()))
+ .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
+ )
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4559,9 +4561,10 @@ class KafkaApisTest extends Logging {
cachedSharePartitions.mustAdd(new CachedSharePartition(
new TopicIdPartition(topicId, partitionIndex, topicName), false))
- when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
- .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions, 2)))
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any()))
+ .thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
+ )
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -4615,7 +4618,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@@ -4679,7 +4682,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@@ -4765,7 +4768,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@@ -4900,14 +4903,14 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId, partitionIndex, topicName), false)
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
- ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions, 2))
- ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions, 3))
+ ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
+ ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
2), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions, 3))
)
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
@@ -5162,17 +5165,17 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId4, 0, topicName4), false
))
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 0)),
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 1)),
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0)),
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
))
- ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions1, 2))
- ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions2, 3))
+ ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 2))
+ ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
2), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 3))
).thenReturn(new FinalContext())
when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
@@ -6127,12 +6130,13 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId, 0, topicName), false
))
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0),
util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
- ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
- new ShareSession(new ShareSessionKey(groupId, memberId),
cachedSharePartitions, 2)))
+ ).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId,
1), new ShareSession(
+ new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
+ )
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@@ -6329,7 +6333,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
- when(sharePartitionManager.newContext(any(), any(), any(), any(),
any())).thenReturn(
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any())).thenReturn(
new FinalContext()
)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index ef02c576dd1..7aba491536f 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -43,6 +43,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
@AfterEach
def tearDown(): Unit = {
closeProducer
+ closeSockets
}
@ClusterTest(
@@ -59,7 +60,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code,
shareFetchResponse.data.errorCode)
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
@@ -75,7 +76,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val metadata: ShareRequestMetadata = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, Map.empty)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code,
shareAcknowledgeResponse.data.errorCode)
}
@@ -123,9 +124,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Send the share fetch request to the non-replica and verify the error
code
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest,
nonReplicaId)
assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs)
-
val partitionData =
shareFetchResponse.responseData(topicNames).get(topicIdPartition)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
assertEquals(leader, partitionData.currentLeader().leaderId())
@@ -174,7 +174,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMap)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -245,7 +245,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// as the share partitions might not be initialized yet. So, we retry
until we get the response.
var responses = Seq[ShareFetchResponseData.PartitionData]()
TestUtils.waitUntilTrue(() => {
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
@@ -340,9 +340,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2,
Seq.empty, acknowledgementsMap)
var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3,
Seq.empty, acknowledgementsMap)
- var shareFetchResponse1 =
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
- var shareFetchResponse2 =
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
- var shareFetchResponse3 =
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
+ var shareFetchResponse1 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1,
destination = leader1)
+ var shareFetchResponse2 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2,
destination = leader2)
+ var shareFetchResponse3 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3,
destination = leader3)
initProducer()
// Producing 10 records to the topic partitions created above
@@ -356,9 +356,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2,
Seq.empty, acknowledgementsMap)
shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3,
Seq.empty, acknowledgementsMap)
- shareFetchResponse1 =
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
- shareFetchResponse2 =
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
- shareFetchResponse3 =
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
+ shareFetchResponse1 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1,
destination = leader1)
+ shareFetchResponse2 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2,
destination = leader2)
+ shareFetchResponse3 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3,
destination = leader3)
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
@@ -451,7 +451,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -478,7 +478,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) //
Accept the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMapForAcknowledge)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@@ -500,7 +500,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -566,7 +566,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -595,7 +595,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) //
Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -620,7 +620,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -684,7 +684,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -711,7 +711,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) //
Release the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMapForAcknowledge)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@@ -730,7 +730,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -794,7 +794,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -839,7 +839,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
releaseAcknowledgementSent = true
}
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -908,7 +908,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -935,7 +935,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) //
Reject the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMapForAcknowledge)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@@ -957,7 +957,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1021,7 +1021,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1050,7 +1050,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) //
Reject the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1075,7 +1075,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1141,7 +1141,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1168,7 +1168,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) //
Release the records
var shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMapForAcknowledge)
- var shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ var shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
var shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@@ -1187,7 +1187,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1213,7 +1213,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) //
Release the records again
shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata,
acknowledgementsMapForAcknowledge)
- shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@@ -1235,7 +1235,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1312,9 +1312,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val acknowledgementsMap3: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send,
Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
- val shareFetchResponse1 =
connectAndReceive[ShareFetchResponse](shareFetchRequest1)
- val shareFetchResponse2 =
connectAndReceive[ShareFetchResponse](shareFetchRequest2)
- val shareFetchResponse3 =
connectAndReceive[ShareFetchResponse](shareFetchRequest3)
+ val shareFetchResponse1 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1)
+ val shareFetchResponse2 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2)
+ val shareFetchResponse3 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3)
val shareFetchResponseData1 = shareFetchResponse1.data()
@@ -1407,9 +1407,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val acknowledgementsMap3: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3,
send, Seq.empty, acknowledgementsMap3)
- val shareFetchResponse1 =
connectAndReceive[ShareFetchResponse](shareFetchRequest1)
- val shareFetchResponse2 =
connectAndReceive[ShareFetchResponse](shareFetchRequest2)
- val shareFetchResponse3 =
connectAndReceive[ShareFetchResponse](shareFetchRequest3)
+ val shareFetchResponse1 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1)
+ val shareFetchResponse2 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2)
+ val shareFetchResponse3 =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3)
val shareFetchResponseData1 = shareFetchResponse1.data()
@@ -1487,7 +1487,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1516,7 +1516,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) //
Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1542,7 +1542,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(19)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) //
Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1595,7 +1595,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1624,7 +1624,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) //
Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMapForFetch)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1651,7 +1651,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(19)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) //
Accept the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMapForAcknowledge)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@@ -1711,7 +1711,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) //
Acknowledgements in the Initial Fetch Request
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMap)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
// The response will have a top level error code because this is an
Initial Fetch request with acknowledgement data present
@@ -1759,7 +1759,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMap)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code,
shareAcknowledgeResponseData.errorCode)
@@ -1809,7 +1809,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1831,7 +1831,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code,
shareFetchResponseData.errorCode)
@@ -1881,7 +1881,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1908,7 +1908,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMap)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code,
shareAcknowledgeResponseData.errorCode)
@@ -1959,7 +1959,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ var shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -1981,12 +1981,94 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code,
shareFetchResponseData.errorCode)
}
+ @ClusterTests(
+ Array(
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "group.share.max.share.sessions",
value="2"),
+ new ClusterConfigProperty(key = "group.share.max.size", value="2")
+ )
+ ),
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "group.share.persister.class.name",
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "1"),
+ new ClusterConfigProperty(key = "group.share.max.share.sessions",
value="2"),
+ new ClusterConfigProperty(key = "group.share.max.size", value="2")
+ )
+ ),
+ )
+ )
+ def testShareSessionEvictedOnConnectionDrop(): Unit = {
+ val groupId: String = "group"
+ val memberId1 = Uuid.randomUuid()
+ val memberId2 = Uuid.randomUuid()
+ val memberId3 = Uuid.randomUuid()
+
+ val topic = "topic"
+ val partition = 0
+
+ createTopicAndReturnLeaders(topic, numPartitions = 3)
+ val topicIds = getTopicIds.asJava
+ val topicId = topicIds.get(topic)
+ val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // member1 sends share fetch request to register it's share session. Note
it does not close the socket connection after.
+ TestUtils.waitUntilTrue(() => {
+ val metadata = new ShareRequestMetadata(memberId1,
ShareRequestMetadata.INITIAL_EPOCH)
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+ shareFetchResponseData.errorCode == Errors.NONE.code
+ }, "Share fetch request failed", 5000)
+
+ // member2 sends share fetch request to register it's share session. Note
it does not close the socket connection after.
+ TestUtils.waitUntilTrue(() => {
+ val metadata = new ShareRequestMetadata(memberId2,
ShareRequestMetadata.INITIAL_EPOCH)
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+ shareFetchResponseData.errorCode == Errors.NONE.code
+ }, "Share fetch request failed", 5000)
+
+ // member3 sends share fetch request to register it's share session. Since
the maximum number of share sessions that could
+ // exist in the share session cache is 2 (group.share.max.share.sessions),
the attempt to register a third
+ // share session with the ShareSessionCache would throw
SHARE_SESSION_LIMIT_REACHED
+ TestUtils.waitUntilTrue(() => {
+ val metadata = new ShareRequestMetadata(memberId3,
ShareRequestMetadata.INITIAL_EPOCH)
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+ shareFetchResponseData.errorCode == Errors.SHARE_SESSION_NOT_FOUND.code
+ }, "Share fetch request failed", 5000)
+
+ // Now we will close the socket connections for the above three members,
mimicking a client disconnection
+ closeSockets()
+
+ // Since the socket connections were closed before, the corresponding
share sessions were dropped from the ShareSessionCache
+ // on the broker. Now, since the cache is empty, new share sessions can be
registered
+ TestUtils.waitUntilTrue(() => {
+ val metadata = new ShareRequestMetadata(memberId3,
ShareRequestMetadata.INITIAL_EPOCH)
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponseData = shareFetchResponse.data()
+ shareFetchResponseData.errorCode == Errors.NONE.code
+ }, "Share fetch request failed", 5000)
+ }
+
@ClusterTests(
Array(
new ClusterTest(
@@ -2032,7 +2114,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -2059,7 +2141,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, acknowledgementsMap)
- val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
+ val shareAcknowledgeResponse =
connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code,
shareAcknowledgeResponseData.errorCode)
@@ -2118,7 +2200,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// as the share partitions might not be initialized yet. So, we retry
until we get the response.
var responses = Seq[ShareFetchResponseData.PartitionData]()
TestUtils.waitUntilTrue(() => {
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
@@ -2144,7 +2226,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
shareFetchRequest = createShareFetchRequest(groupId, metadata, Seq.empty,
forget, acknowledgementsMap)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@@ -2294,7 +2376,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
topicIdPartitions, Seq.empty, Map.empty)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ val shareFetchResponse =
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index a20cdfe9057..865870eef3b 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -587,6 +587,80 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}
@ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "group.share.max.size", value = "2")
+ ))
+ def testShareGroupMaxSizeConfigExceeded(): Unit = {
+ val groupId: String = "group"
+ val memberId1 = Uuid.randomUuid()
+ val memberId2 = Uuid.randomUuid()
+ val memberId3 = Uuid.randomUuid()
+
+ val admin = cluster.admin()
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group by the first member (memberId1).
+ var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1.toString)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List("foo").asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a
delay
+ // here because the group coordinator is loaded in the background.
+ var shareGroupHeartbeatResponse: ShareGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+
+ // Heartbeat request to join the group by the second member (memberId2).
+ shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2.toString)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List("foo").asJava)
+ ).build()
+
+ // Send the request until receiving a successful response
+ TestUtils.waitUntilTrue(() => {
+ shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+
+ // Heartbeat request to join the group by the third member (memberId3).
+ shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId3.toString)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List("foo").asJava)
+ ).build()
+
+ shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ // Since the group.share.max.size config is set to 2, a third member
cannot join the same group.
+ assertEquals(shareGroupHeartbeatResponse.data.errorCode,
Errors.GROUP_MAX_SIZE_REACHED.code)
+
+ } finally {
+ admin.close()
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
diff --git
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index 0b06ea535be..f0f37d9ec7d 100644
---
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
+++
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.network.ConnectionDisconnectListener;
import org.apache.kafka.server.share.CachedSharePartition;
import com.yammer.metrics.core.Meter;
@@ -53,10 +54,13 @@ public class ShareSessionCache {
private final int maxEntries;
private long numPartitions = 0;
+ private final ConnectionDisconnectListener connectionDisconnectListener;
// A map of session key to ShareSession.
private final Map<ShareSessionKey, ShareSession> sessions = new
HashMap<>();
+ private final Map<String, ShareSessionKey> connectionIdToSessionMap;
+
@SuppressWarnings("this-escape")
public ShareSessionCache(int maxEntries) {
this.maxEntries = maxEntries;
@@ -64,6 +68,8 @@ public class ShareSessionCache {
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server",
"ShareSessionCache");
metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
metricsGroup.newGauge(SHARE_PARTITIONS_COUNT, this::totalPartitions);
+ this.connectionIdToSessionMap = new HashMap<>();
+ this.connectionDisconnectListener = new
ClientConnectionDisconnectListener();
this.evictionsMeter =
metricsGroup.newMeter(SHARE_SESSION_EVICTIONS_PER_SEC, "evictions",
TimeUnit.SECONDS);
}
@@ -123,21 +129,48 @@ public class ShareSessionCache {
* @param groupId - The group id in the share fetch request.
* @param memberId - The member id in the share fetch request.
* @param partitionMap - The topic partitions to be added to the session.
+ * @param clientConnectionId - The client connection id.
* @return - The session key if the session was created, or null if the
session was not created.
*/
- public synchronized ShareSessionKey maybeCreateSession(String groupId,
Uuid memberId, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap)
{
+ public synchronized ShareSessionKey maybeCreateSession(
+ String groupId,
+ Uuid memberId,
+ ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
+ String clientConnectionId
+ ) {
if (sessions.size() < maxEntries) {
ShareSession session = new ShareSession(new
ShareSessionKey(groupId, memberId), partitionMap,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
sessions.put(session.key(), session);
updateNumPartitions(session);
+ connectionIdToSessionMap.put(clientConnectionId, session.key());
return session.key();
}
return null;
}
+ public ConnectionDisconnectListener connectionDisconnectListener() {
+ return connectionDisconnectListener;
+ }
+
// Visible for testing.
Meter evictionsMeter() {
return evictionsMeter;
}
+
+ private final class ClientConnectionDisconnectListener implements
ConnectionDisconnectListener {
+
+ // When the client disconnects, the corresponding session should be
removed from the cache.
+ @Override
+ public void onDisconnect(String connectionId) {
+ ShareSessionKey shareSessionKey =
connectionIdToSessionMap.remove(connectionId);
+ if (shareSessionKey != null) {
+ // Remove the session from the cache.
+ ShareSession removedSession = remove(shareSessionKey);
+ if (removedSession != null) {
+ evictionsMeter.mark();
+ }
+ }
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index ca18de5b65c..c9692063b5c 100644
---
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -45,11 +45,11 @@ public class ShareSessionCacheTest {
public void testShareSessionCache() throws InterruptedException {
ShareSessionCache cache = new ShareSessionCache(3);
assertEquals(0, cache.size());
- ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(10));
- ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(20));
- ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(30));
- assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(40)));
- assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(5)));
+ ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(10), "conn-1");
+ ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(20), "conn-2");
+ ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(30), "conn-3");
+ assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(40), "conn-4"));
+ assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(5), "conn-5"));
assertShareCacheContains(cache, List.of(key1, key2, key3));
assertMetricsValues(3, 60, 0, cache);
@@ -60,7 +60,7 @@ public class ShareSessionCacheTest {
ShareSessionCache cache = new ShareSessionCache(2);
assertEquals(0, cache.size());
assertEquals(0, cache.totalPartitions());
- ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(2));
+ ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-1");
assertNotNull(key1);
assertShareCacheContains(cache, List.of(key1));
ShareSession session1 = cache.get(key1);
@@ -70,7 +70,7 @@ public class ShareSessionCacheTest {
assertMetricsValues(1, 2, 0, cache);
- ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(4));
+ ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(4), "conn-2");
assertNotNull(key2);
assertShareCacheContains(cache, List.of(key1, key2));
ShareSession session2 = cache.get(key2);
@@ -81,7 +81,7 @@ public class ShareSessionCacheTest {
assertMetricsValues(2, 6, 0, cache);
- ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(5));
+ ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(5), "conn-3");
assertNull(key3);
assertShareCacheContains(cache, List.of(key1, key2));
assertEquals(6, cache.totalPartitions());
@@ -109,6 +109,36 @@ public class ShareSessionCacheTest {
assertMetricsValues(1, 3, 0, cache);
}
+ @Test
+ public void testRemoveConnection() throws InterruptedException {
+ ShareSessionCache cache = new ShareSessionCache(3);
+ assertEquals(0, cache.size());
+ ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(1), "conn-1");
+ ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-2");
+ ShareSessionKey key3 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(3), "conn-3");
+
+ assertMetricsValues(3, 6, 0, cache);
+
+ // Since cache size is now equal to max entries allowed(3), no new
session can be created.
+ assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(40), "conn-4"));
+ assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(5), "conn-5"));
+ assertShareCacheContains(cache, List.of(key1, key2, key3));
+
+ assertMetricsValues(3, 6, 0, cache);
+
+ // Simulating the disconnection of client with connection id conn-1
+ cache.connectionDisconnectListener().onDisconnect("conn-1");
+ assertShareCacheContains(cache, List.of(key2, key3));
+
+ assertMetricsValues(2, 5, 1, cache);
+
+ // Since one client got disconnected, we can add another one now
+ ShareSessionKey key4 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(4), "conn-6");
+ assertShareCacheContains(cache, List.of(key2, key3, key4));
+
+ assertMetricsValues(3, 9, 1, cache);
+ }
+
private ImplicitLinkedHashCollection<CachedSharePartition>
mockedSharePartitionMap(int size) {
ImplicitLinkedHashCollection<CachedSharePartition> cacheMap = new
ImplicitLinkedHashCollection<>(size);