This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5ce463ed3d KAFKA-19253: Improve metadata handling for share version
using feature listeners (1/N) (#19659)
d5ce463ed3d is described below
commit d5ce463ed3dfea84af3fde05ceb267a8133a07bc
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue May 13 19:31:03 2025 +0530
KAFKA-19253: Improve metadata handling for share version using feature
listeners (1/N) (#19659)
This PR creates a listener for `SharePartitionManager` to listen to any
changes in `ShareVersion` feature. In case, there is a toggle, we need
to change the attributes in `SharePartitionManager` accordingly.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/server/share/SharePartitionManager.java | 27 ++++++++
.../src/main/scala/kafka/server/BrokerServer.scala | 13 +++-
.../server/metadata/BrokerMetadataPublisher.scala | 27 +++++++-
.../server/share/SharePartitionManagerTest.java | 80 ++++++++++++++++++----
.../metadata/BrokerMetadataPublisherTest.scala | 76 ++++++++++++++++++--
.../server/share/session/ShareSessionCache.java | 35 +++++++++-
.../share/session/ShareSessionCacheTest.java | 21 +++++-
7 files changed, 253 insertions(+), 26 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 44af40ec8f8..88321c1b87e 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
@@ -746,6 +747,32 @@ public class SharePartitionManager implements
AutoCloseable {
};
}
+ /**
+ * The handler for share version feature metadata changes.
+ * @param shareVersion the new share version feature
+ */
+ public void onShareVersionToggle(ShareVersion shareVersion) {
+ if (!shareVersion.supportsShareGroups()) {
+ cache.updateSupportsShareGroups(false);
+ // Remove all share sessions from share session cache.
+ synchronized (cache) {
+ cache.removeAllSessions();
+ }
+ Set<SharePartitionKey> sharePartitionKeys = new
HashSet<>(partitionCacheMap.keySet());
+ // Remove all share partitions from partition cache.
+ sharePartitionKeys.forEach(sharePartitionKey ->
+ removeSharePartitionFromCache(sharePartitionKey,
partitionCacheMap, replicaManager)
+ );
+ } else {
+ cache.updateSupportsShareGroups(true);
+ }
+ }
+
+ // Visible for testing.
+ protected int partitionCacheSize() {
+ return partitionCacheMap.size();
+ }
+
/**
* The SharePartitionListener is used to listen for partition events. The
share partition is associated with
* the topic-partition, we need to handle the partition events for the
share partition.
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e2d41cac3bb..f2796a5e5eb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -45,7 +45,8 @@ import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
+import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
+import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, FinalizedFeatures, NodeToControllerChannelManager,
ShareVersion, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType,
DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager,
RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin,
KafkaYammerMetrics}
@@ -259,7 +260,12 @@ class BrokerServer(
Optional.of(clientMetricsManager)
)
- val shareFetchSessionCache : ShareSessionCache = new
ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
+ val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
+ config.shareGroupConfig.shareGroupMaxShareSessions(),
+ ShareVersion.fromFeatureLevel(
+
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)
+ ).supportsShareGroups()
+ )
val connectionDisconnectListeners = Seq(
clientMetricsManager.connectionDisconnectListener(),
@@ -516,7 +522,8 @@ class BrokerServer(
authorizerPlugin.toJava
),
sharedServer.initialBrokerMetadataLoadFaultHandler,
- sharedServer.metadataPublishingFaultHandler
+ sharedServer.metadataPublishingFaultHandler,
+ sharePartitionManager
)
// If the BrokerLifecycleManager's initial catch-up future fails, it
means we timed out
// or are shutting down before we could catch up. Therefore, also fail
the firstPublishFuture.
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 48fd6c95f6c..a62f990d3a0 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -20,6 +20,7 @@ package kafka.server.metadata
import java.util.OptionalInt
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
+import kafka.server.share.SharePartitionManager
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
@@ -32,7 +33,8 @@ import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.AclPublisher
-import org.apache.kafka.server.common.RequestLocal
+import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
+import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal,
ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
@@ -80,6 +82,7 @@ class BrokerMetadataPublisher(
aclPublisher: AclPublisher,
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler,
+ sharePartitionManager: SharePartitionManager
) extends MetadataPublisher with Logging {
logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "
@@ -100,6 +103,11 @@ class BrokerMetadataPublisher(
*/
val firstPublishFuture = new CompletableFuture[Void]
+ /**
+ * The share version being used in the broker metadata.
+ */
+ private var finalizedShareVersion: Short =
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)
+
override def name(): String = "BrokerMetadataPublisher"
override def onMetadataUpdate(
@@ -242,6 +250,23 @@ class BrokerMetadataPublisher(
if (_firstPublish) {
finishInitializingReplicaManager()
}
+
+ if (delta.featuresDelta != null) {
+ try {
+ val newFinalizedFeatures = new
FinalizedFeatures(newImage.features.metadataVersionOrThrow,
newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
+ // Share version feature has been toggled.
+ if
(newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort) != finalizedShareVersion) {
+ finalizedShareVersion =
newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)
+ val shareVersion: ShareVersion =
ShareVersion.fromFeatureLevel(finalizedShareVersion)
+ info(s"Feature share.version has been updated to version
$finalizedShareVersion")
+ sharePartitionManager.onShareVersionToggle(shareVersion)
+ }
+ } catch {
+ case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating share partition
manager " +
+ s" with share version feature change in $delta", t)
+ }
+ }
+
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
s"publishing broker metadata from $deltaName", t)
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 15b943499e8..34b8c0c2840 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -50,6 +50,7 @@ import
org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.CachedSharePartition;
@@ -185,7 +186,7 @@ public class SharePartitionManagerTest {
@Test
public void testNewContextReturnsFinalContextWithoutRequestData() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -212,7 +213,7 @@ public class SharePartitionManagerTest {
@Test
public void testNewContextReturnsFinalContextWithRequestData() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -242,7 +243,7 @@ public class SharePartitionManagerTest {
@Test
public void
testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequestData() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -273,7 +274,7 @@ public class SharePartitionManagerTest {
@Test
public void testNewContext() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -434,7 +435,7 @@ public class SharePartitionManagerTest {
@Test
public void testZeroSizeShareSession() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -480,7 +481,7 @@ public class SharePartitionManagerTest {
@Test
public void testToForgetPartitions() {
String groupId = "grp";
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -518,7 +519,7 @@ public class SharePartitionManagerTest {
@Test
public void testShareSessionUpdateTopicIdsBrokerSide() {
String groupId = "grp";
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -569,7 +570,7 @@ public class SharePartitionManagerTest {
@Test
public void testGetErroneousAndValidTopicIdPartitions() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -662,7 +663,7 @@ public class SharePartitionManagerTest {
@Test
public void testShareFetchContextResponseSize() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -763,7 +764,7 @@ public class SharePartitionManagerTest {
@Test
public void testCachedTopicPartitionsWithNoTopicPartitions() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -774,7 +775,7 @@ public class SharePartitionManagerTest {
@Test
public void testCachedTopicPartitionsForValidShareSessions() {
- ShareSessionCache cache = new ShareSessionCache(10);
+ ShareSessionCache cache = new ShareSessionCache(10, true);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
@@ -2803,6 +2804,61 @@ public class SharePartitionManagerTest {
validateRotatedListEquals(topicIdPartitions,
resultShareFetch.topicIdPartitions(), 1);
}
+ @Test
+ public void testShareSessionCacheSupportsShareGroups() {
+ ShareSessionCache cache1 = new ShareSessionCache(10, false);
+ sharePartitionManager = SharePartitionManagerBuilder.builder()
+ .withCache(cache1)
+ .build();
+ // Toggle of supportsShareGroups from false to true.
+ sharePartitionManager.onShareVersionToggle(ShareVersion.SV_1);
+ assertTrue(cache1.supportsShareGroups());
+
+ ShareSessionCache cache2 = new ShareSessionCache(10, true);
+ sharePartitionManager = SharePartitionManagerBuilder.builder()
+ .withCache(cache2)
+ .build();
+ // Toggle of supportsShareGroups from true to false.
+ sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
+ assertFalse(cache2.supportsShareGroups());
+ }
+
+ @Test
+ public void testOnShareVersionToggle() {
+ String groupId = "grp";
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+ SharePartition sp2 = mock(SharePartition.class);
+ SharePartition sp3 = mock(SharePartition.class);
+
+ // Mock the share partitions corresponding to the topic partitions.
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(
+ new SharePartitionKey(groupId, new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0))), sp0
+ );
+ partitionCacheMap.put(
+ new SharePartitionKey(groupId, new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0))), sp1
+ );
+ partitionCacheMap.put(
+ new SharePartitionKey(groupId, new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0))), sp2
+ );
+ partitionCacheMap.put(
+ new SharePartitionKey(groupId, new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 0))), sp3
+ );
+ sharePartitionManager = SharePartitionManagerBuilder.builder()
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+ assertEquals(4, sharePartitionManager.partitionCacheSize());
+ sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
+ // Because we are toggling to a share version which does not support
share groups, the cache inside share partitions must be cleared.
+ assertEquals(0, sharePartitionManager.partitionCacheSize());
+ //Check if all share partitions have been fenced.
+ Mockito.verify(sp0).markFenced();
+ Mockito.verify(sp1).markFenced();
+ Mockito.verify(sp2).markFenced();
+ Mockito.verify(sp3).markFenced();
+ }
+
private Timer systemTimerReaper() {
return new SystemTimerReaper(
TIMER_NAME_PREFIX + "-test-reaper",
@@ -3012,7 +3068,7 @@ public class SharePartitionManagerTest {
private final Persister persister = new NoOpStatePersister();
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime();
- private ShareSessionCache cache = new ShareSessionCache(10);
+ private ShareSessionCache cache = new ShareSessionCache(10, true);
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
private Timer timer = new MockTimer();
private ShareGroupMetrics shareGroupMetrics = new
ShareGroupMetrics(time);
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 20e5072cdd8..82045896b4f 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -20,25 +20,27 @@ package kafka.server.metadata
import kafka.coordinator.transaction.TransactionCoordinator
import java.util.Collections.{singleton, singletonList, singletonMap}
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.LogManager
+import kafka.server.share.SharePartitionManager
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry,
NewTopic}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
+import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
-import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataImageTest, MetadataProvenance}
+import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest,
ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta,
MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage,
ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
-import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
+import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -48,6 +50,7 @@ import org.mockito.Mockito.{doThrow, mock, verify}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
+import java.util
import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
@@ -204,7 +207,8 @@ class BrokerMetadataPublisherTest {
mock(classOf[DelegationTokenPublisher]),
mock(classOf[AclPublisher]),
faultHandler,
- faultHandler
+ faultHandler,
+ mock(classOf[SharePartitionManager])
)
val image = MetadataImage.EMPTY
@@ -223,4 +227,68 @@ class BrokerMetadataPublisherTest {
verify(groupCoordinator).onNewMetadataImage(image, delta)
}
+
+ @Test
+ def testNewShareVersionPushedToSharePartitionManager(): Unit = {
+ val sharePartitionManager = mock(classOf[SharePartitionManager])
+ val faultHandler = mock(classOf[FaultHandler])
+
+ val metadataPublisher = new BrokerMetadataPublisher(
+ KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)),
+ new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_1),
+ mock(classOf[LogManager]),
+ mock(classOf[ReplicaManager]),
+ mock(classOf[GroupCoordinator]),
+ mock(classOf[TransactionCoordinator]),
+ mock(classOf[ShareCoordinator]),
+ mock(classOf[DynamicConfigPublisher]),
+ mock(classOf[DynamicClientQuotaPublisher]),
+ mock(classOf[DynamicTopicClusterQuotaPublisher]),
+ mock(classOf[ScramPublisher]),
+ mock(classOf[DelegationTokenPublisher]),
+ mock(classOf[AclPublisher]),
+ faultHandler,
+ faultHandler,
+ sharePartitionManager
+ )
+
+ val featuresImage = new FeaturesImage(
+ util.Map.of(
+ MetadataVersion.FEATURE_NAME,
MetadataVersion.latestTesting().featureLevel(),
+ ShareVersion.FEATURE_NAME, ShareVersion.SV_1.featureLevel()
+ ),
+ MetadataVersion.latestTesting())
+
+ val image = new MetadataImage(
+ MetadataProvenance.EMPTY,
+ featuresImage,
+ ClusterImageTest.IMAGE1,
+ TopicsImage.EMPTY,
+ ConfigurationsImage.EMPTY,
+ ClientQuotasImage.EMPTY,
+ ProducerIdsImage.EMPTY,
+ AclsImage.EMPTY,
+ ScramImage.EMPTY,
+ DelegationTokenImage.EMPTY
+ )
+
+ // Share version 1 is getting passed to features delta.
+ val delta = new MetadataDelta(image)
+ delta.replay(new
FeatureLevelRecord().setName(ShareVersion.FEATURE_NAME).setFeatureLevel(1))
+
+ metadataPublisher.onMetadataUpdate(
+ delta,
+ image,
+ LogDeltaManifest.newBuilder().
+ provenance(MetadataProvenance.EMPTY).
+ leaderAndEpoch(new LeaderAndEpoch(OptionalInt.of(1), 1)).
+ numBatches(1).
+ elapsedNs(1L).
+ numBytes(1).
+ build()
+ )
+
+ // SharePartitionManager is receiving the latest changes.
+ verify(sharePartitionManager).onShareVersionToggle(any())
+ }
}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index f0f37d9ec7d..e57a7c8ed9c 100644
---
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
+++
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -29,6 +29,7 @@ import com.yammer.metrics.core.Meter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Caches share sessions.
@@ -60,10 +61,15 @@ public class ShareSessionCache {
private final Map<ShareSessionKey, ShareSession> sessions = new
HashMap<>();
private final Map<String, ShareSessionKey> connectionIdToSessionMap;
+ /**
+ * Flag indicating if share groups have been turned on.
+ */
+ private final AtomicBoolean supportsShareGroups;
@SuppressWarnings("this-escape")
- public ShareSessionCache(int maxEntries) {
+ public ShareSessionCache(int maxEntries, boolean supportsShareGroups) {
this.maxEntries = maxEntries;
+ this.supportsShareGroups = new AtomicBoolean(supportsShareGroups);
// Register metrics for ShareSessionCache.
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server",
"ShareSessionCache");
metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
@@ -90,6 +96,14 @@ public class ShareSessionCache {
return sessions.size();
}
+ /**
+ * Remove all the share sessions from cache.
+ */
+ public synchronized void removeAllSessions() {
+ sessions.clear();
+ numPartitions = 0;
+ }
+
public synchronized long totalPartitions() {
return numPartitions;
}
@@ -121,7 +135,9 @@ public class ShareSessionCache {
* @param session The session.
*/
public synchronized void updateNumPartitions(ShareSession session) {
- numPartitions += session.updateCachedSize();
+ if (supportsShareGroups.get()) {
+ numPartitions += session.updateCachedSize();
+ }
}
/**
@@ -138,7 +154,7 @@ public class ShareSessionCache {
ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
String clientConnectionId
) {
- if (sessions.size() < maxEntries) {
+ if (sessions.size() < maxEntries && supportsShareGroups.get()) {
ShareSession session = new ShareSession(new
ShareSessionKey(groupId, memberId), partitionMap,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
sessions.put(session.key(), session);
@@ -173,4 +189,17 @@ public class ShareSessionCache {
}
}
}
+
+ /**
+ * Update the value of supportsShareGroups to reflect if share groups are
turned on.
+ * @param supportsShareGroups - Boolean indicating if share groups are
turned on.
+ */
+ public void updateSupportsShareGroups(boolean supportsShareGroups) {
+ this.supportsShareGroups.set(supportsShareGroups);
+ }
+
+ // Visible for testing.
+ public boolean supportsShareGroups() {
+ return supportsShareGroups.get();
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index c9692063b5c..7022fbb59fa 100644
---
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -43,7 +43,7 @@ public class ShareSessionCacheTest {
@Test
public void testShareSessionCache() throws InterruptedException {
- ShareSessionCache cache = new ShareSessionCache(3);
+ ShareSessionCache cache = new ShareSessionCache(3, true);
assertEquals(0, cache.size());
ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(10), "conn-1");
ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(20), "conn-2");
@@ -57,7 +57,7 @@ public class ShareSessionCacheTest {
@Test
public void testResizeCachedSessions() throws InterruptedException {
- ShareSessionCache cache = new ShareSessionCache(2);
+ ShareSessionCache cache = new ShareSessionCache(2, true);
assertEquals(0, cache.size());
assertEquals(0, cache.totalPartitions());
ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-1");
@@ -111,7 +111,7 @@ public class ShareSessionCacheTest {
@Test
public void testRemoveConnection() throws InterruptedException {
- ShareSessionCache cache = new ShareSessionCache(3);
+ ShareSessionCache cache = new ShareSessionCache(3, true);
assertEquals(0, cache.size());
ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(1), "conn-1");
ShareSessionKey key2 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-2");
@@ -139,6 +139,21 @@ public class ShareSessionCacheTest {
assertMetricsValues(3, 9, 1, cache);
}
+ @Test
+ public void testRemoveAllSessions() {
+ ShareSessionCache cache = new ShareSessionCache(3, true);
+ assertEquals(0, cache.size());
+ assertEquals(0, cache.totalPartitions());
+ cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(10), "conn-1");
+ cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(20), "conn-2");
+ cache.maybeCreateSession("grp", Uuid.randomUuid(),
mockedSharePartitionMap(30), "conn-3");
+ assertEquals(3, cache.size());
+ assertEquals(60, cache.totalPartitions());
+ cache.removeAllSessions();
+ assertEquals(0, cache.size());
+ assertEquals(0, cache.totalPartitions());
+ }
+
private ImplicitLinkedHashCollection<CachedSharePartition>
mockedSharePartitionMap(int size) {
ImplicitLinkedHashCollection<CachedSharePartition> cacheMap = new
ImplicitLinkedHashCollection<>(size);