This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee2204f2130 KAFKA-20161 Group configuration for 
share.renew.acknowledge.enable (#21467)
ee2204f2130 is described below

commit ee2204f2130d6f7c5710f57c90c85ae835be14d6
Author: Lan Ding <[email protected]>
AuthorDate: Mon Mar 9 05:57:30 2026 +0800

    KAFKA-20161 Group configuration for share.renew.acknowledge.enable (#21467)
    
    Introduce the `share.renew.acknowledge.enable` group configuration
    (KIP-1222) to control whether renew acknowledgements are permitted for a
    share group.
    
      - Add new boolean dynamic group config
    `share.renew.acknowledge.enable` (default false) in `GroupConfig`
    - When disabled, RENEW acknowledgements are rejected with
    `INVALID_RECORD_STATE` error code
     - Update existing RENEW tests to explicitly enable the config
     - Add unit tests and integration test for the disabled-by-default
    behavior
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 42 ++++++++++
 .../java/kafka/server/share/ShareFetchUtils.java   | 33 --------
 .../java/kafka/server/share/SharePartition.java    | 25 ++++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  8 +-
 .../kafka/server/share/ShareFetchUtilsTest.java    | 20 -----
 .../kafka/server/share/SharePartitionTest.java     | 88 +++++++++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  3 +-
 .../kafka/coordinator/group/GroupConfig.java       | 20 +++++
 .../modern/share/ShareGroupConfigProvider.java     | 73 +++++++++++++++++
 .../kafka/coordinator/group/GroupConfigTest.java   | 10 +++
 .../modern/share/ShareGroupConfigProviderTest.java | 94 ++++++++++++++++++++++
 11 files changed, 351 insertions(+), 65 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index e3de0ea3442..8a08a046587 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -3082,6 +3082,35 @@ public class ShareConsumerTest {
         verifyYammerMetricCount("ackType=Renew", 0);
     }
 
+    @ClusterTest
+    public void testRenewAcknowledgementDisabled() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareRenewAcknowledgeEnable("group1", false);
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+
+            for (ConsumerRecord<byte[], byte[]> rec : records) {
+                shareConsumer.acknowledge(rec, AcknowledgeType.RENEW);
+            }
+
+            Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+            assertEquals(1, result.size());
+            Optional<KafkaException> error = result.get(new 
TopicIdPartition(tpId, tp.partition(), tp.topic()));
+            assertTrue(error.isPresent());
+            assertInstanceOf(InvalidRecordStateException.class, error.get());
+        }
+    }
+
     @ClusterTest(
         brokers = 1,
         serverProperties = {
@@ -4325,6 +4354,19 @@ public class ShareConsumerTest {
         }
     }
 
+    private void alterShareRenewAcknowledgeEnable(String groupId, boolean 
newValue) {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
+        Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
+        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+            GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
Boolean.toString(newValue)), AlterConfigOp.OpType.SET)));
+        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+        try (Admin adminClient = createAdminClient()) {
+            assertDoesNotThrow(() -> 
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+                .all()
+                .get(60, TimeUnit.SECONDS), "Failed to alter configs");
+        }
+    }
+
     private List<Integer> topicPartitionLeader(Admin adminClient, String 
topicName, int partition) throws InterruptedException, ExecutionException {
         return 
adminClient.describeTopics(List.of(topicName)).allTopicNames().get().get(topicName)
             .partitions().stream()
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index e16a2a43a63..ba3f03a4cca 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -259,38 +258,6 @@ public class ShareFetchUtils {
         }
     }
 
-    /**
-     * The method is used to get the record lock duration for the group. If 
the group config is present,
-     * then the record lock duration is returned. Otherwise, the default value 
is returned.
-     *
-     * @param groupConfigManager The group config manager.
-     * @param groupId The group id for which the record lock duration is to be 
fetched.
-     * @param defaultValue The default value to be returned if the group 
config is not present.
-     * @return The record lock duration for the group.
-     */
-    public static int recordLockDurationMsOrDefault(GroupConfigManager 
groupConfigManager, String groupId, int defaultValue) {
-        if (groupConfigManager.groupConfig(groupId).isPresent()) {
-            return 
groupConfigManager.groupConfig(groupId).get().shareRecordLockDurationMs();
-        }
-        return defaultValue;
-    }
-
-    /**
-     * The method is used to get the delivery count limit for the group. If 
the group config is present,
-     * then the delivery count limit is returned. Otherwise, the default value 
is returned.
-     *
-     * @param groupConfigManager The group config manager.
-     * @param groupId The group id for which the delivery count limit is to be 
fetched.
-     * @param defaultValue The default value to be returned if the group 
config is not present.
-     * @return The delivery count limit for the group.
-     */
-    public static int deliveryCountLimitOrDefault(GroupConfigManager 
groupConfigManager, String groupId, int defaultValue) {
-        if (groupConfigManager.groupConfig(groupId).isPresent()) {
-            return 
groupConfigManager.groupConfig(groupId).get().shareDeliveryCountLimit();
-        }
-        return defaultValue;
-    }
-
     /**
      * Merges contiguous AcquiredRecords with the same delivery count into 
single records.
      * <p>
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index fad0618460b..4c8fc717f5d 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
@@ -91,11 +92,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static kafka.server.share.ShareFetchUtils.deliveryCountLimitOrDefault;
 import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
 import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
 import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
-import static kafka.server.share.ShareFetchUtils.recordLockDurationMsOrDefault;
 
 /**
  * The SharePartition is used to track the state of a partition that is shared 
between multiple
@@ -213,6 +212,11 @@ public class SharePartition {
      */
     private final GroupConfigManager groupConfigManager;
 
+    /**
+     * The provider used to retrieve share group dynamic configuration values.
+     */
+    private final ShareGroupConfigProvider configProvider;
+
     /**
      * This is the default value which is used unless the group has a 
configuration which overrides it.
      * The record lock duration is used to limit the duration for which a 
consumer can acquire a record.
@@ -381,6 +385,7 @@ public class SharePartition {
         this.partitionState = sharePartitionState;
         this.replicaManager = replicaManager;
         this.groupConfigManager = groupConfigManager;
+        this.configProvider = new ShareGroupConfigProvider(groupConfigManager);
         this.fetchOffsetMetadata = new OffsetMetadata();
         this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, 
topicIdPartition);
         this.listener = listener;
@@ -1851,7 +1856,7 @@ public class SharePartition {
                         null,
                         timeoutHandler,
                         sharePartitionMetrics);
-                    int delayMs = 
recordLockDurationMsOrDefault(groupConfigManager, groupId, 
defaultRecordLockDurationMs);
+                    int delayMs = 
configProvider.recordLockDurationMsOrDefault(groupId, 
defaultRecordLockDurationMs);
                     long lastOffset = acquiredRecords.firstOffset() + 
maxFetchRecords - 1;
                     inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, 
delayMs);
                     updateFindNextFetchOffset(true);
@@ -2298,6 +2303,11 @@ public class SharePartition {
                 byte ackType = ackTypeMap.size() > 1 ? 
ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
 
                 if (ackType == AcknowledgeType.RENEW.id) {
+                    if (!configProvider.isRenewAcknowledgeEnabled(groupId)) {
+                        log.debug("Renew acknowledge is not enabled for the 
group: {}", groupId);
+                        return Optional.of(new InvalidRecordStateException(
+                            "Renewing acquisition locks is not enabled for the 
group."));
+                    }
                     // If RENEW, renew the acquisition lock timer for this 
offset and continue without changing state.
                     // We do not care about recordState map here.
                     // Only valid for ACQUIRED offsets; the check above 
ensures this.
@@ -2371,6 +2381,11 @@ public class SharePartition {
             // Before reaching this point, it should be verified that it is 
full batch ack and
             // not per offset ack as well as startOffset not moved.
             if (ackType == AcknowledgeType.RENEW.id) {
+                if (!configProvider.isRenewAcknowledgeEnabled(groupId)) {
+                    log.debug("Renew acknowledge is not enabled for the group: 
{}", groupId);
+                    return Optional.of(new InvalidRecordStateException(
+                        "Renewing acquisition locks is not enabled for the 
group."));
+                }
                 // Renew the acquisition lock timer for the complete batch. We 
have already
                 // checked that the batchState is ACQUIRED above.
                 log.debug("Renewing acquisition lock for {}-{} with batch 
{}-{} for member {}.",
@@ -2831,7 +2846,7 @@ public class SharePartition {
         // The recordLockDuration value would depend on whether the dynamic 
config SHARE_RECORD_LOCK_DURATION_MS in
         // GroupConfig.java is set or not. If dynamic config is set, then that 
is used, otherwise the value of
         // SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG defined in 
ShareGroupConfig is used
-        int recordLockDurationMs = 
recordLockDurationMsOrDefault(groupConfigManager, groupId, 
defaultRecordLockDurationMs);
+        int recordLockDurationMs = 
configProvider.recordLockDurationMsOrDefault(groupId, 
defaultRecordLockDurationMs);
         return scheduleAcquisitionLockTimeout(memberId, firstOffset, 
lastOffset, recordLockDurationMs);
     }
 
@@ -3313,7 +3328,7 @@ public class SharePartition {
      * config if available, otherwise the broker default.
      */
     int maxDeliveryCount() {
-        return deliveryCountLimitOrDefault(groupConfigManager, groupId, 
defaultMaxDeliveryCount);
+        return configProvider.deliveryCountLimitOrDefault(groupId, 
defaultMaxDeliveryCount);
     }
 
     /**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 00659864384..00b3b11495a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinat
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
 import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
-import kafka.server.share.{ShareFetchUtils, SharePartitionManager}
+import kafka.server.share.SharePartitionManager
 import kafka.utils.Logging
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.EndpointType
@@ -57,6 +57,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
 import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider
 import org.apache.kafka.coordinator.group.{Group, GroupConfig, 
GroupConfigManager, GroupCoordinator}
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
@@ -120,6 +121,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   val configManager = new ConfigAdminManager(brokerId, config, 
configRepository)
   val describeTopicPartitionsRequestHandler = new 
DescribeTopicPartitionsRequestHandler(
     metadataCache, authHelper, config)
+  val shareGroupConfigProvider = new 
ShareGroupConfigProvider(groupConfigManager)
 
   def close(): Unit = {
     aclApis.close()
@@ -4061,7 +4063,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       0,
       partitions,
       nodeEndpoints.values.toList.asJava,
-      ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager, 
groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs)
+      shareGroupConfigProvider.recordLockDurationMsOrDefault(groupId, 
config.shareGroupConfig.shareGroupRecordLockDurationMs)
     )
   }
 
@@ -4159,7 +4161,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // Prepare share fetch response
       val response =
         ShareFetchResponse.of(shareFetchResponse.error, throttleTimeMs, 
responseData, nodeEndpoints.values.toList.asJava,
-          ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager, 
groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs))
+          shareGroupConfigProvider.recordLockDurationMsOrDefault(groupId, 
config.shareGroupConfig.shareGroupRecordLockDurationMs))
       // record the bytes out metrics only when the response is being sent.
       response.data.responses.forEach { topicResponse =>
         topicResponse.partitions.forEach { data =>
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 0b6239ec68b..08ce763b8ad 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -34,8 +34,6 @@ import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.requests.FetchRequest;
-import org.apache.kafka.coordinator.group.GroupConfig;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -729,24 +727,6 @@ public class ShareFetchUtilsTest {
         assertArrayEquals(input.toArray(), result.toArray());
     }
 
-    @Test
-    void testDeliveryCountLimitOrDefaultWithGroupConfig() {
-        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
-        GroupConfig groupConfig = mock(GroupConfig.class);
-        when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
-        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
-
-        assertEquals(8, 
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group", 
5));
-    }
-
-    @Test
-    void testDeliveryCountLimitOrDefaultWithoutGroupConfig() {
-        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
-        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
-
-        assertEquals(5, 
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group", 
5));
-    }
-
     private static class RecordsArgumentsProvider implements ArgumentsProvider 
{
         @Override
         public Stream<? extends Arguments> provideArguments(ExtensionContext 
context) throws Exception {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 88b173c5239..af998054020 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -7157,7 +7157,7 @@ public class SharePartitionTest {
 
         AcquisitionLockTimerTask timerTask = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
-        Mockito.verify(groupConfigManager, 
Mockito.times(2)).groupConfig(GROUP_ID);
+        Mockito.verify(groupConfigManager, 
Mockito.times(1)).groupConfig(GROUP_ID);
         Mockito.verify(groupConfig).shareRecordLockDurationMs();
         assertEquals(expectedDurationMs, timerTask.delayMs);
     }
@@ -7179,13 +7179,13 @@ public class SharePartitionTest {
 
         AcquisitionLockTimerTask timerTask1 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
-        Mockito.verify(groupConfigManager, 
Mockito.times(2)).groupConfig(GROUP_ID);
+        Mockito.verify(groupConfigManager, 
Mockito.times(1)).groupConfig(GROUP_ID);
         Mockito.verify(groupConfig).shareRecordLockDurationMs();
         assertEquals(expectedDurationMs1, timerTask1.delayMs);
 
         AcquisitionLockTimerTask timerTask2 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
-        Mockito.verify(groupConfigManager, 
Mockito.times(4)).groupConfig(GROUP_ID);
+        Mockito.verify(groupConfigManager, 
Mockito.times(2)).groupConfig(GROUP_ID);
         Mockito.verify(groupConfig, 
Mockito.times(2)).shareRecordLockDurationMs();
         assertEquals(expectedDurationMs2, timerTask2.delayMs);
     }
@@ -10381,6 +10381,79 @@ public class SharePartitionTest {
         Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testRenewAcknowledgeDisabledWithCompleteBatchAck() {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withPersister(persister)
+            .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+            .build();
+
+        List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, 
memoryRecords(0, 1), 1);
+        assertEquals(1, records.size());
+        assertEquals(1, sharePartition.cachedState().size());
+
+        CompletableFuture<Void> future = sharePartition.acknowledge(MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(0, 0, 
List.of(AcknowledgeType.RENEW.id))));
+
+        assertTrue(future.isCompletedExceptionally());
+        try {
+            future.get();
+            fail("No exception thrown");
+        } catch (Exception e) {
+            assertNotNull(e);
+            assertInstanceOf(InvalidRecordStateException.class, e.getCause());
+            assertTrue(e.getCause().getMessage().contains("Renewing 
acquisition locks is not enabled for the group."));
+        }
+
+        // The batch should still be in ACQUIRED state since the renew was 
rejected.
+        InFlightBatch batch = sharePartition.cachedState().get(0L);
+        assertEquals(RecordState.ACQUIRED, batch.batchState());
+        Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any());
+    }
+
+    @Test
+    public void testRenewAcknowledgeDisabledWithPerOffsetAck() {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withPersister(persister)
+            .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+            .build();
+
+        List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, 
memoryRecords(0, 2), 2);
+        assertEquals(1, records.size());
+        assertEquals(1, sharePartition.cachedState().size());
+
+        CompletableFuture<Void> future = sharePartition.acknowledge(MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(0, 1,
+                List.of(AcknowledgeType.RENEW.id, 
AcknowledgeType.ACCEPT.id))));
+
+        assertTrue(future.isCompletedExceptionally());
+        try {
+            future.get();
+            fail("No exception thrown");
+        } catch (Exception e) {
+            assertNotNull(e);
+            assertInstanceOf(InvalidRecordStateException.class, e.getCause());
+            assertTrue(e.getCause().getMessage().contains("Renewing 
acquisition locks is not enabled for the group."));
+        }
+
+        // The offsets should still be in ACQUIRED state since the renew was 
rejected
+        // at the first offset. The batch has per-offset state at this point 
because the
+        // ack had per-offset ack types.
+        InFlightBatch batch = sharePartition.cachedState().get(0L);
+        assertNotNull(batch.offsetState());
+        assertEquals(RecordState.ACQUIRED, 
batch.offsetState().get(0L).state());
+        assertEquals(RecordState.ACQUIRED, 
batch.offsetState().get(1L).state());
+        Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any());
+    }
+
     @Test
     public void testAcquireSingleBatchInRecordLimitMode() throws 
InterruptedException {
         Persister persister = Mockito.mock(Persister.class);
@@ -12354,6 +12427,15 @@ public class SharePartitionTest {
         assertFalse(sharePartition.cachedState().isEmpty());
     }
 
+    private static GroupConfigManager groupConfigManagerWithRenewDisabled() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+        
Mockito.when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
+        
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(ACQUISITION_LOCK_TIMEOUT_MS);
+        return groupConfigManager;
+    }
+
     private static class SharePartitionBuilder {
 
         private int defaultAcquisitionLockTimeoutMs = 30000;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7ac893f378d..48268685da2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
S [...]
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -362,6 +362,7 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT.toString)
     cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, 
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
     cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG, 
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
+    cgConfigs.put(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT)
     cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index bff700cd73d..d7ef5c0a407 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -32,6 +32,7 @@ import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@@ -71,6 +72,10 @@ public final class GroupConfig extends AbstractConfig {
         "If set to \"read_uncommitted\", the share group will return all 
messages, even transactional messages which have been aborted. " +
         "Non-transactional records will be returned unconditionally in either 
mode.";
 
+    public static final String SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG = 
"share.renew.acknowledge.enable";
+    public static final boolean SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT = true;
+    public static final String SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC = "Whether 
the renew acknowledge type is enabled for the share group.";
+
     public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG = 
"streams.session.timeout.ms";
 
     public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG = 
"streams.heartbeat.interval.ms";
@@ -103,6 +108,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public final String shareIsolationLevel;
 
+    public final boolean shareRenewAcknowledgeEnable;
+
     private static final ConfigDef CONFIG = new ConfigDef()
         .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
             INT,
@@ -152,6 +159,11 @@ public final class GroupConfig extends AbstractConfig {
             in(IsolationLevel.READ_COMMITTED.toString(), 
IsolationLevel.READ_UNCOMMITTED.toString()),
             MEDIUM,
             SHARE_ISOLATION_LEVEL_DOC)
+        .define(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
+            BOOLEAN,
+            SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT,
+            MEDIUM,
+            SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC)
         .define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
             INT,
             GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -191,6 +203,7 @@ public final class GroupConfig extends AbstractConfig {
         this.streamsNumStandbyReplicas = 
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
         this.streamsInitialRebalanceDelayMs = 
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
         this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
+        this.shareRenewAcknowledgeEnable = 
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
     }
 
     public static ConfigDef configDef() {
@@ -434,6 +447,13 @@ public final class GroupConfig extends AbstractConfig {
             throw new IllegalArgumentException("Unknown Share isolation level: 
" + shareIsolationLevel);
         }
     }
+    
+    /**
+     * The share group renew acknowledge enable.
+     */
+    public boolean shareRenewAcknowledgeEnable() {
+        return shareRenewAcknowledgeEnable;
+    }
 
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" + 
config));
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
new file mode 100644
index 00000000000..7e4ec55e5ff
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.share;
+
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
+
+/**
+ * A provider that retrieves share group dynamic configuration values,
+ * falling back to default values when group-specific configurations are not 
present.
+ */
+public class ShareGroupConfigProvider {
+    private final GroupConfigManager manager;
+
+    public ShareGroupConfigProvider(GroupConfigManager manager) {
+        this.manager = manager;
+    }
+
+    /**
+     * The method is used to get the record lock duration for the group. If 
the group config is present,
+     * then the record lock duration is returned. Otherwise, the default value 
is returned.
+     *
+     * @param groupId The group id for which the record lock duration is to be 
fetched.
+     * @param defaultValue The default value to be returned if the group 
config is not present.
+     * @return The record lock duration for the group.
+     */
+    public int recordLockDurationMsOrDefault(String groupId, int defaultValue) 
{
+        return manager.groupConfig(groupId).
+            map(GroupConfig::shareRecordLockDurationMs).
+            orElse(defaultValue);
+    }
+
+    /**
+     * The method is used to get the delivery count limit for the group. If 
the group config is present,
+     * then the delivery count limit is returned. Otherwise, the default value 
is returned.
+     *
+     * @param groupId The group id for which the delivery count limit is to be 
fetched.
+     * @param defaultValue The default value to be returned if the group 
config is not present.
+     * @return The delivery count limit for the group.
+     */
+    public int deliveryCountLimitOrDefault(String groupId, int defaultValue) {
+        return manager.groupConfig(groupId)
+            .map(GroupConfig::shareDeliveryCountLimit)
+            .orElse(defaultValue);
+    }
+
+    /**
+     * The method is used to check if renew acknowledge is enabled for the 
group. If the group config
+     * is present, then the value from the group config is used. Otherwise, 
the default value is used.
+     *
+     * @param groupId The group id for which the renew acknowledge enable is 
to be checked.
+     * @return true if renew acknowledge is enabled for the group, false 
otherwise.
+     */
+    public boolean isRenewAcknowledgeEnabled(String groupId) {
+        return manager.groupConfig(groupId)
+            .map(GroupConfig::shareRenewAcknowledgeEnable)
+            .orElse(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT);
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 6f5e1cf1f13..cc46f04a9b3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -64,6 +64,8 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "hello", "1.0");
             } else if (GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "hello", "1.0");
+            } else if 
(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_boolean", "1");
             } else if 
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -228,6 +230,11 @@ public class GroupConfigTest {
         // Check for invalid shareIsolationLevel.
         props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_uncommit");
         doTestInvalidProps(props, ConfigException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid shareRenewAcknowledgeEnable.
+        props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "1");
+        doTestInvalidProps(props, ConfigException.class);
     }
 
     private void doTestInvalidProps(Properties props, Class<? extends 
Exception> exceptionClassName) {
@@ -253,6 +260,7 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
+        defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
"true");
 
         Properties props = new Properties();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@@ -270,6 +278,7 @@ public class GroupConfigTest {
         assertEquals(2000, 
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(1, 
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
         assertEquals(3000, 
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
+        assertEquals(true, 
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
     }
 
     @Test
@@ -294,6 +303,7 @@ public class GroupConfigTest {
         props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
         props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"3000");
+        props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
         return props;
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
new file mode 100644
index 00000000000..d8cd49bae79
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.share;
+
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ShareGroupConfigProviderTest {
+    private ShareGroupConfigProvider provider;
+
+    @Test
+    void testRecordLockDurationMsOrDefaultWithGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        GroupConfig groupConfig = mock(GroupConfig.class);
+        when(groupConfig.shareRecordLockDurationMs()).thenReturn(1000);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(1000, 
provider.recordLockDurationMsOrDefault("test-group", 100));
+    }
+
+    @Test
+    void testRecordLockDurationMsOrDefaultWithoutGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(100, provider.recordLockDurationMsOrDefault("test-group", 
100));
+    }
+
+    @Test
+    void testDeliveryCountLimitOrDefaultWithGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        GroupConfig groupConfig = mock(GroupConfig.class);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(8, provider.deliveryCountLimitOrDefault("test-group", 5));
+    }
+
+    @Test
+    void testDeliveryCountLimitOrDefaultWithoutGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(5, provider.deliveryCountLimitOrDefault("test-group", 5));
+    }
+
+    @Test
+    void testIsRenewAcknowledgeDisabledWithGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        GroupConfig groupConfig = mock(GroupConfig.class);
+        when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertFalse(provider.isRenewAcknowledgeEnabled("test-group"));
+    }
+
+    @Test
+    void testIsRenewAcknowledgeEnabledWithoutGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertTrue(provider.isRenewAcknowledgeEnabled("test-group"));
+    }
+}


Reply via email to