This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5ce463ed3d KAFKA-19253: Improve metadata handling for share version 
using feature listeners (1/N) (#19659)
d5ce463ed3d is described below

commit d5ce463ed3dfea84af3fde05ceb267a8133a07bc
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue May 13 19:31:03 2025 +0530

    KAFKA-19253: Improve metadata handling for share version using feature 
listeners (1/N) (#19659)
    
    This PR creates a listener for `SharePartitionManager` to listen to any
    changes in `ShareVersion` feature. In case, there is a toggle, we need
    to change the attributes in `SharePartitionManager` accordingly.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/server/share/SharePartitionManager.java  | 27 ++++++++
 .../src/main/scala/kafka/server/BrokerServer.scala | 13 +++-
 .../server/metadata/BrokerMetadataPublisher.scala  | 27 +++++++-
 .../server/share/SharePartitionManagerTest.java    | 80 ++++++++++++++++++----
 .../metadata/BrokerMetadataPublisherTest.scala     | 76 ++++++++++++++++++--
 .../server/share/session/ShareSessionCache.java    | 35 +++++++++-
 .../share/session/ShareSessionCacheTest.java       | 21 +++++-
 7 files changed, 253 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 44af40ec8f8..88321c1b87e 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.ShareRequestMetadata;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.share.CachedSharePartition;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
@@ -746,6 +747,32 @@ public class SharePartitionManager implements 
AutoCloseable {
         };
     }
 
+    /**
+     * The handler for share version feature metadata changes.
+     * @param shareVersion the new share version feature
+     */
+    public void onShareVersionToggle(ShareVersion shareVersion) {
+        if (!shareVersion.supportsShareGroups()) {
+            cache.updateSupportsShareGroups(false);
+            // Remove all share sessions from share session cache.
+            synchronized (cache) {
+                cache.removeAllSessions();
+            }
+            Set<SharePartitionKey> sharePartitionKeys = new 
HashSet<>(partitionCacheMap.keySet());
+            // Remove all share partitions from partition cache.
+            sharePartitionKeys.forEach(sharePartitionKey ->
+                removeSharePartitionFromCache(sharePartitionKey, 
partitionCacheMap, replicaManager)
+            );
+        } else {
+            cache.updateSupportsShareGroups(true);
+        }
+    }
+
+    // Visible for testing.
+    protected int partitionCacheSize() {
+        return partitionCacheMap.size();
+    }
+
     /**
      * The SharePartitionListener is used to listen for partition events. The 
share partition is associated with
      * the topic-partition, we need to handle the partition events for the 
share partition.
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e2d41cac3bb..f2796a5e5eb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -45,7 +45,8 @@ import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
 import org.apache.kafka.metadata.publisher.AclPublisher
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
+import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
+import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, FinalizedFeatures, NodeToControllerChannelManager, 
ShareVersion, TopicIdPartition}
 import org.apache.kafka.server.config.{ConfigType, 
DelegationTokenManagerConfigs}
 import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, 
RemoteLogManagerConfig}
 import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, 
KafkaYammerMetrics}
@@ -259,7 +260,12 @@ class BrokerServer(
         Optional.of(clientMetricsManager)
       )
 
-      val shareFetchSessionCache : ShareSessionCache = new 
ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
+      val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
+        config.shareGroupConfig.shareGroupMaxShareSessions(),
+        ShareVersion.fromFeatureLevel(
+          
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)
+        ).supportsShareGroups()
+      )
 
       val connectionDisconnectListeners = Seq(
         clientMetricsManager.connectionDisconnectListener(),
@@ -516,7 +522,8 @@ class BrokerServer(
           authorizerPlugin.toJava
         ),
         sharedServer.initialBrokerMetadataLoadFaultHandler,
-        sharedServer.metadataPublishingFaultHandler
+        sharedServer.metadataPublishingFaultHandler,
+        sharePartitionManager
       )
       // If the BrokerLifecycleManager's initial catch-up future fails, it 
means we timed out
       // or are shutting down before we could catch up. Therefore, also fail 
the firstPublishFuture.
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 48fd6c95f6c..a62f990d3a0 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -20,6 +20,7 @@ package kafka.server.metadata
 import java.util.OptionalInt
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.LogManager
+import kafka.server.share.SharePartitionManager
 import kafka.server.{KafkaConfig, ReplicaManager}
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
@@ -32,7 +33,8 @@ import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
 import org.apache.kafka.metadata.publisher.AclPublisher
-import org.apache.kafka.server.common.RequestLocal
+import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
+import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, 
ShareVersion}
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}
 
@@ -80,6 +82,7 @@ class BrokerMetadataPublisher(
   aclPublisher: AclPublisher,
   fatalFaultHandler: FaultHandler,
   metadataPublishingFaultHandler: FaultHandler,
+  sharePartitionManager: SharePartitionManager
 ) extends MetadataPublisher with Logging {
   logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "
 
@@ -100,6 +103,11 @@ class BrokerMetadataPublisher(
    */
   val firstPublishFuture = new CompletableFuture[Void]
 
+  /**
+   * The share version being used in the broker metadata.
+   */
+  private var finalizedShareVersion: Short = 
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)
+
   override def name(): String = "BrokerMetadataPublisher"
 
   override def onMetadataUpdate(
@@ -242,6 +250,23 @@ class BrokerMetadataPublisher(
       if (_firstPublish) {
         finishInitializingReplicaManager()
       }
+
+      if (delta.featuresDelta != null) {
+        try {
+          val newFinalizedFeatures = new 
FinalizedFeatures(newImage.features.metadataVersionOrThrow, 
newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
+          // Share version feature has been toggled.
+          if 
(newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort) != finalizedShareVersion) {
+            finalizedShareVersion = 
newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)
+            val shareVersion: ShareVersion = 
ShareVersion.fromFeatureLevel(finalizedShareVersion)
+            info(s"Feature share.version has been updated to version 
$finalizedShareVersion")
+            sharePartitionManager.onShareVersionToggle(shareVersion)
+          }
+        } catch {
+          case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating share partition 
manager " +
+            s" with share version feature change in $delta", t)
+        }
+      }
+
     } catch {
       case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
         s"publishing broker metadata from $deltaName", t)
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 15b943499e8..34b8c0c2840 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -50,6 +50,7 @@ import 
org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
 import org.apache.kafka.server.share.CachedSharePartition;
@@ -185,7 +186,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testNewContextReturnsFinalContextWithoutRequestData() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -212,7 +213,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testNewContextReturnsFinalContextWithRequestData() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -242,7 +243,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void 
testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequestData() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -273,7 +274,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testNewContext() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -434,7 +435,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testZeroSizeShareSession() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -480,7 +481,7 @@ public class SharePartitionManagerTest {
     @Test
     public void testToForgetPartitions() {
         String groupId = "grp";
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -518,7 +519,7 @@ public class SharePartitionManagerTest {
     @Test
     public void testShareSessionUpdateTopicIdsBrokerSide() {
         String groupId = "grp";
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -569,7 +570,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testGetErroneousAndValidTopicIdPartitions() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -662,7 +663,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testShareFetchContextResponseSize() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -763,7 +764,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testCachedTopicPartitionsWithNoTopicPartitions() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -774,7 +775,7 @@ public class SharePartitionManagerTest {
 
     @Test
     public void testCachedTopicPartitionsForValidShareSessions() {
-        ShareSessionCache cache = new ShareSessionCache(10);
+        ShareSessionCache cache = new ShareSessionCache(10, true);
         sharePartitionManager = SharePartitionManagerBuilder.builder()
             .withCache(cache)
             .build();
@@ -2803,6 +2804,61 @@ public class SharePartitionManagerTest {
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 1);
     }
 
+    @Test
+    public void testShareSessionCacheSupportsShareGroups() {
+        ShareSessionCache cache1 = new ShareSessionCache(10, false);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache1)
+            .build();
+        // Toggle of supportsShareGroups from false to true.
+        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_1);
+        assertTrue(cache1.supportsShareGroups());
+
+        ShareSessionCache cache2 = new ShareSessionCache(10, true);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache2)
+            .build();
+        // Toggle of supportsShareGroups from true to false.
+        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
+        assertFalse(cache2.supportsShareGroups());
+    }
+
+    @Test
+    public void testOnShareVersionToggle() {
+        String groupId = "grp";
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+        SharePartition sp3 = mock(SharePartition.class);
+
+        // Mock the share partitions corresponding to the topic partitions.
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(
+            new SharePartitionKey(groupId, new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0))), sp0
+        );
+        partitionCacheMap.put(
+            new SharePartitionKey(groupId, new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0))), sp1
+        );
+        partitionCacheMap.put(
+            new SharePartitionKey(groupId, new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0))), sp2
+        );
+        partitionCacheMap.put(
+            new SharePartitionKey(groupId, new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 0))), sp3
+        );
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withPartitionCacheMap(partitionCacheMap)
+            .build();
+        assertEquals(4, sharePartitionManager.partitionCacheSize());
+        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
+        // Because we are toggling to a share version which does not support 
share groups, the cache inside share partitions must be cleared.
+        assertEquals(0, sharePartitionManager.partitionCacheSize());
+        //Check if all share partitions have been fenced.
+        Mockito.verify(sp0).markFenced();
+        Mockito.verify(sp1).markFenced();
+        Mockito.verify(sp2).markFenced();
+        Mockito.verify(sp3).markFenced();
+    }
+
     private Timer systemTimerReaper() {
         return new SystemTimerReaper(
             TIMER_NAME_PREFIX + "-test-reaper",
@@ -3012,7 +3068,7 @@ public class SharePartitionManagerTest {
         private final Persister persister = new NoOpStatePersister();
         private ReplicaManager replicaManager = mock(ReplicaManager.class);
         private Time time = new MockTime();
-        private ShareSessionCache cache = new ShareSessionCache(10);
+        private ShareSessionCache cache = new ShareSessionCache(10, true);
         private Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
         private Timer timer = new MockTimer();
         private ShareGroupMetrics shareGroupMetrics = new 
ShareGroupMetrics(time);
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 20e5072cdd8..82045896b4f 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -20,25 +20,27 @@ package kafka.server.metadata
 import kafka.coordinator.transaction.TransactionCoordinator
 
 import java.util.Collections.{singleton, singletonList, singletonMap}
-import java.util.Properties
+import java.util.{OptionalInt, Properties}
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import kafka.log.LogManager
+import kafka.server.share.SharePartitionManager
 import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
 import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, 
NewTopic}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
+import org.apache.kafka.common.metadata.FeatureLevelRecord
 import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, MetadataProvenance}
+import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, 
MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, 
ScramImage, TopicsImage}
 import org.apache.kafka.image.loader.LogDeltaManifest
 import org.apache.kafka.metadata.publisher.AclPublisher
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.LeaderAndEpoch
-import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
+import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
ShareVersion}
 import org.apache.kafka.server.fault.FaultHandler
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -48,6 +50,7 @@ import org.mockito.Mockito.{doThrow, mock, verify}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
+import java.util
 import java.util.concurrent.TimeUnit
 import scala.jdk.CollectionConverters._
 
@@ -204,7 +207,8 @@ class BrokerMetadataPublisherTest {
       mock(classOf[DelegationTokenPublisher]),
       mock(classOf[AclPublisher]),
       faultHandler,
-      faultHandler
+      faultHandler,
+      mock(classOf[SharePartitionManager])
     )
 
     val image = MetadataImage.EMPTY
@@ -223,4 +227,68 @@ class BrokerMetadataPublisherTest {
 
     verify(groupCoordinator).onNewMetadataImage(image, delta)
   }
+
+  @Test
+  def testNewShareVersionPushedToSharePartitionManager(): Unit = {
+    val sharePartitionManager = mock(classOf[SharePartitionManager])
+    val faultHandler = mock(classOf[FaultHandler])
+
+    val metadataPublisher = new BrokerMetadataPublisher(
+      KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)),
+      new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_1),
+      mock(classOf[LogManager]),
+      mock(classOf[ReplicaManager]),
+      mock(classOf[GroupCoordinator]),
+      mock(classOf[TransactionCoordinator]),
+      mock(classOf[ShareCoordinator]),
+      mock(classOf[DynamicConfigPublisher]),
+      mock(classOf[DynamicClientQuotaPublisher]),
+      mock(classOf[DynamicTopicClusterQuotaPublisher]),
+      mock(classOf[ScramPublisher]),
+      mock(classOf[DelegationTokenPublisher]),
+      mock(classOf[AclPublisher]),
+      faultHandler,
+      faultHandler,
+      sharePartitionManager
+    )
+
+    val featuresImage = new FeaturesImage(
+      util.Map.of(
+        MetadataVersion.FEATURE_NAME, 
MetadataVersion.latestTesting().featureLevel(),
+        ShareVersion.FEATURE_NAME, ShareVersion.SV_1.featureLevel()
+      ),
+      MetadataVersion.latestTesting())
+
+    val image = new MetadataImage(
+      MetadataProvenance.EMPTY,
+      featuresImage,
+      ClusterImageTest.IMAGE1,
+      TopicsImage.EMPTY,
+      ConfigurationsImage.EMPTY,
+      ClientQuotasImage.EMPTY,
+      ProducerIdsImage.EMPTY,
+      AclsImage.EMPTY,
+      ScramImage.EMPTY,
+      DelegationTokenImage.EMPTY
+    )
+
+    // Share version 1 is getting passed to features delta.
+    val delta = new MetadataDelta(image)
+    delta.replay(new 
FeatureLevelRecord().setName(ShareVersion.FEATURE_NAME).setFeatureLevel(1))
+
+    metadataPublisher.onMetadataUpdate(
+      delta,
+      image,
+      LogDeltaManifest.newBuilder().
+        provenance(MetadataProvenance.EMPTY).
+        leaderAndEpoch(new LeaderAndEpoch(OptionalInt.of(1), 1)).
+        numBatches(1).
+        elapsedNs(1L).
+        numBytes(1).
+        build()
+    )
+
+    // SharePartitionManager is receiving the latest changes.
+    verify(sharePartitionManager).onShareVersionToggle(any())
+  }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index f0f37d9ec7d..e57a7c8ed9c 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -29,6 +29,7 @@ import com.yammer.metrics.core.Meter;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Caches share sessions.
@@ -60,10 +61,15 @@ public class ShareSessionCache {
     private final Map<ShareSessionKey, ShareSession> sessions = new 
HashMap<>();
 
     private final Map<String, ShareSessionKey> connectionIdToSessionMap;
+    /**
+     * Flag indicating if share groups have been turned on.
+     */
+    private final AtomicBoolean supportsShareGroups;
 
     @SuppressWarnings("this-escape")
-    public ShareSessionCache(int maxEntries) {
+    public ShareSessionCache(int maxEntries, boolean supportsShareGroups) {
         this.maxEntries = maxEntries;
+        this.supportsShareGroups = new AtomicBoolean(supportsShareGroups);
         // Register metrics for ShareSessionCache.
         KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", 
"ShareSessionCache");
         metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
@@ -90,6 +96,14 @@ public class ShareSessionCache {
         return sessions.size();
     }
 
+    /**
+     * Remove all the share sessions from cache.
+     */
+    public synchronized void removeAllSessions() {
+        sessions.clear();
+        numPartitions = 0;
+    }
+
     public synchronized long totalPartitions() {
         return numPartitions;
     }
@@ -121,7 +135,9 @@ public class ShareSessionCache {
      * @param session  The session.
      */
     public synchronized void updateNumPartitions(ShareSession session) {
-        numPartitions += session.updateCachedSize();
+        if (supportsShareGroups.get()) {
+            numPartitions += session.updateCachedSize();
+        }
     }
 
     /**
@@ -138,7 +154,7 @@ public class ShareSessionCache {
         ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
         String clientConnectionId
     ) {
-        if (sessions.size() < maxEntries) {
+        if (sessions.size() < maxEntries && supportsShareGroups.get()) {
             ShareSession session = new ShareSession(new 
ShareSessionKey(groupId, memberId), partitionMap,
                 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
             sessions.put(session.key(), session);
@@ -173,4 +189,17 @@ public class ShareSessionCache {
             }
         }
     }
+
+    /**
+     * Update the value of supportsShareGroups to reflect if share groups are 
turned on.
+     * @param supportsShareGroups - Boolean indicating if share groups are 
turned on.
+     */
+    public void updateSupportsShareGroups(boolean supportsShareGroups) {
+        this.supportsShareGroups.set(supportsShareGroups);
+    }
+
+    // Visible for testing.
+    public boolean supportsShareGroups() {
+        return supportsShareGroups.get();
+    }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index c9692063b5c..7022fbb59fa 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -43,7 +43,7 @@ public class ShareSessionCacheTest {
 
     @Test
     public void testShareSessionCache() throws InterruptedException {
-        ShareSessionCache cache = new ShareSessionCache(3);
+        ShareSessionCache cache = new ShareSessionCache(3, true);
         assertEquals(0, cache.size());
         ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(10), "conn-1");
         ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(20), "conn-2");
@@ -57,7 +57,7 @@ public class ShareSessionCacheTest {
 
     @Test
     public void testResizeCachedSessions() throws InterruptedException {
-        ShareSessionCache cache = new ShareSessionCache(2);
+        ShareSessionCache cache = new ShareSessionCache(2, true);
         assertEquals(0, cache.size());
         assertEquals(0, cache.totalPartitions());
         ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-1");
@@ -111,7 +111,7 @@ public class ShareSessionCacheTest {
 
     @Test
     public void testRemoveConnection() throws InterruptedException {
-        ShareSessionCache cache = new ShareSessionCache(3);
+        ShareSessionCache cache = new ShareSessionCache(3, true);
         assertEquals(0, cache.size());
         ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(1), "conn-1");
         ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-2");
@@ -139,6 +139,21 @@ public class ShareSessionCacheTest {
         assertMetricsValues(3, 9, 1, cache);
     }
 
+    @Test
+    public void testRemoveAllSessions() {
+        ShareSessionCache cache = new ShareSessionCache(3, true);
+        assertEquals(0, cache.size());
+        assertEquals(0, cache.totalPartitions());
+        cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(10), "conn-1");
+        cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(20), "conn-2");
+        cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(30), "conn-3");
+        assertEquals(3, cache.size());
+        assertEquals(60, cache.totalPartitions());
+        cache.removeAllSessions();
+        assertEquals(0, cache.size());
+        assertEquals(0, cache.totalPartitions());
+    }
+
     private ImplicitLinkedHashCollection<CachedSharePartition> 
mockedSharePartitionMap(int size) {
         ImplicitLinkedHashCollection<CachedSharePartition> cacheMap = new
                 ImplicitLinkedHashCollection<>(size);

Reply via email to