This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a670e2b207a KAFKA-20159: Group configuration for 
share.delivery.count.limit (#21491)
a670e2b207a is described below

commit a670e2b207a7b29f6faa93b998c303c5cbdc5b2c
Author: TaiJuWu <[email protected]>
AuthorDate: Sat Feb 21 05:47:19 2026 +0800

    KAFKA-20159: Group configuration for share.delivery.count.limit (#21491)
    
    Make `share.delivery.count.limit` become dynamic config, we can change
    the value at run time and set to specific group.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 126 +++++++++++++++++++++
 .../java/kafka/server/share/ShareFetchUtils.java   |  16 +++
 .../java/kafka/server/share/SharePartition.java    |  53 ++++++---
 .../kafka/server/share/ShareFetchUtilsTest.java    |  20 ++++
 .../kafka/server/share/SharePartitionTest.java     | 108 ++++++++++++++++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../kafka/coordinator/group/GroupConfig.java       |  24 ++++
 .../group/modern/share/ShareGroupConfig.java       |  35 +++++-
 .../kafka/coordinator/group/GroupConfigTest.java   |  18 ++-
 .../group/modern/share/ShareGroupConfigTest.java   |  20 ++++
 10 files changed, 400 insertions(+), 23 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index b67202e29fb..e3de0ea3442 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -3888,6 +3888,119 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testDynamicDeliveryCountLimitDecreaseArchivesRecords() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            // Produce 1 record.
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(),
+                null, "key".getBytes(), "value".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            // Consume and release twice (deliveryCount becomes 2).
+            // Delivery 1: acquire (deliveryCount=1) → release (1 < default 
limit 5 → AVAILABLE).
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Delivery 2: acquire (deliveryCount=2) → release (2 < 5 → 
AVAILABLE).
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            assertEquals((short) 2, 
records.records(tp).get(0).deliveryCount().get());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Dynamically decrease delivery count limit to 3 via group config.
+            alterShareDeliveryCountLimit("group1", "3");
+
+            // Delivery 3: acquire (deliveryCount=3) → release (3 >= 3 → 
ARCHIVED).
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            assertEquals((short) 3, 
records.records(tp).get(0).deliveryCount().get());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Next poll should have no records because the record was 
archived.
+            records = shareConsumer.poll(Duration.ofMillis(2500L));
+            assertTrue(records.isEmpty(),
+                "Records should be empty as the record was archived. But 
received: " + records.count());
+        }
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = "group.share.delivery.count.limit", 
value = "3"),
+        }
+    )
+    public void testDynamicDeliveryCountLimitIncreaseAllowsMoreDeliveries() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            // Produce 1 record.
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(),
+                null, "key".getBytes(), "value".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            // Consume and release twice (deliveryCount becomes 2).
+            // Delivery 1: acquire (deliveryCount=1) → release (1 < broker 
limit 3 → AVAILABLE).
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Delivery 2: acquire (deliveryCount=2) → release (2 < 3 → 
AVAILABLE).
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            assertEquals((short) 2, 
records.records(tp).get(0).deliveryCount().get());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Dynamically increase delivery count limit to 5 via group config.
+            alterShareDeliveryCountLimit("group1", "5");
+
+            // Delivery 3: acquire (deliveryCount=3) → release (3 < 5 → 
AVAILABLE).
+            // Without the config increase, 3 >= 3 would have caused archival.
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            assertEquals((short) 3, 
records.records(tp).get(0).deliveryCount().get());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Delivery 4: acquire (deliveryCount=4) → release (4 < 5 → 
AVAILABLE).
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            assertEquals((short) 4, 
records.records(tp).get(0).deliveryCount().get());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Delivery 5: acquire (deliveryCount=5) → release (5 >= 5 → 
ARCHIVED).
+            records = waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+            assertEquals((short) 5, 
records.records(tp).get(0).deliveryCount().get());
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.RELEASE));
+            shareConsumer.commitSync();
+
+            // Next poll should have no records because the record was 
archived.
+            records = shareConsumer.poll(Duration.ofMillis(2500L));
+            assertTrue(records.isEmpty(),
+                "Records should be empty as the record was archived. But 
received: " + records.count());
+        }
+    }
+
     /**
      * Util class to encapsulate state for a consumer/producer
      * being executed by an {@link ExecutorService}.
@@ -4186,6 +4299,19 @@ public class ShareConsumerTest {
         }
     }
 
+    private void alterShareDeliveryCountLimit(String groupId, String newValue) 
{
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
+        Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
+        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+            GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, newValue), 
AlterConfigOp.OpType.SET)));
+        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+        try (Admin adminClient = createAdminClient()) {
+            assertDoesNotThrow(() -> 
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+                .all()
+                .get(60, TimeUnit.SECONDS), "Failed to alter configs");
+        }
+    }
+
     private void alterShareIsolationLevel(String groupId, String newValue) {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
         Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index deb6e7052d5..e16a2a43a63 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -275,6 +275,22 @@ public class ShareFetchUtils {
         return defaultValue;
     }
 
+    /**
+     * The method is used to get the delivery count limit for the group. If 
the group config is present,
+     * then the delivery count limit is returned. Otherwise, the default value 
is returned.
+     *
+     * @param groupConfigManager The group config manager.
+     * @param groupId The group id for which the delivery count limit is to be 
fetched.
+     * @param defaultValue The default value to be returned if the group 
config is not present.
+     * @return The delivery count limit for the group.
+     */
+    public static int deliveryCountLimitOrDefault(GroupConfigManager 
groupConfigManager, String groupId, int defaultValue) {
+        if (groupConfigManager.groupConfig(groupId).isPresent()) {
+            return 
groupConfigManager.groupConfig(groupId).get().shareDeliveryCountLimit();
+        }
+        return defaultValue;
+    }
+
     /**
      * Merges contiguous AcquiredRecords with the same delivery count into 
single records.
      * <p>
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 3df03e9853e..fad0618460b 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -91,6 +91,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static kafka.server.share.ShareFetchUtils.deliveryCountLimitOrDefault;
 import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
 import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
 import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
@@ -200,17 +201,13 @@ public class SharePartition {
     private final int maxInFlightRecords;
 
     /**
+     * This is the default value which is used unless the group has a 
configuration which overrides it.
      * The max delivery count is used to limit the number of times a record 
can be delivered to the
      * consumer. The max delivery count is used to prevent the consumer 
re-delivering the same record
      * indefinitely.
      */
-    private final int maxDeliveryCount;
+    private final int defaultMaxDeliveryCount;
 
-    /**
-     * Records whose delivery count exceeds this are deemed abnormal and the 
batching of these records
-     * should be reduced. The limit is set to half of maxDeliveryCount rounded 
up, with a minimum of 2.
-     */
-    private final int throttleRecordsDeliveryLimit;
     /**
      * The group config manager is used to retrieve the values for dynamic 
group configurations
      */
@@ -335,7 +332,7 @@ public class SharePartition {
         TopicIdPartition topicIdPartition,
         int leaderEpoch,
         int maxInFlightRecords,
-        int maxDeliveryCount,
+        int defaultMaxDeliveryCount,
         int defaultRecordLockDurationMs,
         Timer timer,
         Time time,
@@ -344,7 +341,7 @@ public class SharePartition {
         GroupConfigManager groupConfigManager,
         SharePartitionListener listener
     ) {
-        this(groupId, topicIdPartition, leaderEpoch, maxInFlightRecords, 
maxDeliveryCount, defaultRecordLockDurationMs,
+        this(groupId, topicIdPartition, leaderEpoch, maxInFlightRecords, 
defaultMaxDeliveryCount, defaultRecordLockDurationMs,
             timer, time, persister, replicaManager, groupConfigManager, 
SharePartitionState.EMPTY, listener,
             new SharePartitionMetrics(groupId, topicIdPartition.topic(), 
topicIdPartition.partition()));
     }
@@ -356,7 +353,7 @@ public class SharePartition {
         TopicIdPartition topicIdPartition,
         int leaderEpoch,
         int maxInFlightRecords,
-        int maxDeliveryCount,
+        int defaultMaxDeliveryCount,
         int defaultRecordLockDurationMs,
         Timer timer,
         Time time,
@@ -371,8 +368,7 @@ public class SharePartition {
         this.topicIdPartition = topicIdPartition;
         this.leaderEpoch = leaderEpoch;
         this.maxInFlightRecords = maxInFlightRecords;
-        this.maxDeliveryCount = maxDeliveryCount;
-        this.throttleRecordsDeliveryLimit = 
Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int) Math.ceil((double) 
maxDeliveryCount / 2));
+        this.defaultMaxDeliveryCount = defaultMaxDeliveryCount;
         this.cachedState = new ConcurrentSkipListMap<>();
         this.lock = new ReentrantReadWriteLock();
         this.findNextFetchOffset = false;
@@ -927,7 +923,7 @@ public class SharePartition {
                     continue;
                 }
 
-                InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE, maxDeliveryCount, memberId);
+                InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId);
                 if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.info("Unable to acquire records for the batch: {} in 
share partition: {}-{}",
                         inFlightBatch, groupId, topicIdPartition);
@@ -1117,7 +1113,7 @@ public class SharePartition {
                 InFlightState updateResult = 
offsetState.getValue().startStateTransition(
                         offsetState.getKey() < startOffset ? 
RecordState.ARCHIVED : recordState,
                         DeliveryCountOps.NO_OP,
-                        this.maxDeliveryCount,
+                        this.maxDeliveryCount(),
                         EMPTY_MEMBER_ID
                 );
                 if (updateResult == null) {
@@ -1159,7 +1155,7 @@ public class SharePartition {
             InFlightState updateResult = 
inFlightBatch.startBatchStateTransition(
                     inFlightBatch.lastOffset() < startOffset ? 
RecordState.ARCHIVED : recordState,
                     DeliveryCountOps.NO_OP,
-                    this.maxDeliveryCount,
+                    this.maxDeliveryCount(),
                     EMPTY_MEMBER_ID
             );
             if (updateResult == null) {
@@ -1924,6 +1920,8 @@ public class SharePartition {
         int acquiredCount = 0;
         long maxFetchRecordsWhileThrottledRecords = -1;
         boolean hasThrottledRecord = false;
+        int maxDeliveryCount = maxDeliveryCount();
+        int throttleRecordsDeliveryLimit = 
throttleRecordsDeliveryLimit(maxDeliveryCount);
         List<AcquiredRecords> offsetAcquiredRecords = new ArrayList<>();
         lock.writeLock().lock();
         try {
@@ -2056,6 +2054,7 @@ public class SharePartition {
      * @return True if the batch should be throttled (delivery count >= 
threshold), false otherwise.
      */
     private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, 
long requestFirstOffset, long requestLastOffset) {
+        int throttleRecordsDeliveryLimit = 
throttleRecordsDeliveryLimit(maxDeliveryCount());
         if (inFlightBatch.offsetState() == null) {
             // If offsetState is null, it means the batch is not split and 
represents a single batch.
             // Check if the batch is in AVAILABLE state and has no ongoing 
transition.
@@ -2322,7 +2321,7 @@ public class SharePartition {
                     InFlightState updateResult = 
offsetState.getValue().startStateTransition(
                         recordState,
                         DeliveryCountOps.NO_OP,
-                        this.maxDeliveryCount,
+                        this.maxDeliveryCount(),
                         EMPTY_MEMBER_ID
                     );
 
@@ -2396,7 +2395,7 @@ public class SharePartition {
             InFlightState updateResult = 
inFlightBatch.startBatchStateTransition(
                 recordState,
                 DeliveryCountOps.NO_OP,
-                this.maxDeliveryCount,
+                this.maxDeliveryCount(),
                 EMPTY_MEMBER_ID
             );
             if (updateResult == null) {
@@ -2933,7 +2932,7 @@ public class SharePartition {
             InFlightState updateResult = inFlightBatch.tryUpdateBatchState(
                     inFlightBatch.lastOffset() < startOffset ? 
RecordState.ARCHIVED : RecordState.AVAILABLE,
                     DeliveryCountOps.NO_OP,
-                    maxDeliveryCount,
+                    maxDeliveryCount(),
                     EMPTY_MEMBER_ID);
             if (updateResult == null) {
                 log.error("Unable to release acquisition lock on timeout for 
the batch: {}"
@@ -2987,7 +2986,7 @@ public class SharePartition {
             InFlightState updateResult = offsetState.getValue().tryUpdateState(
                     offsetState.getKey() < startOffset ? RecordState.ARCHIVED 
: RecordState.AVAILABLE,
                     DeliveryCountOps.NO_OP,
-                    maxDeliveryCount,
+                    maxDeliveryCount(),
                     EMPTY_MEMBER_ID);
             if (updateResult == null) {
                 log.error("Unable to release acquisition lock on timeout for 
the offset: {} in batch: {}"
@@ -3309,6 +3308,24 @@ public class SharePartition {
         return deliveryCompleteCount.get();
     }
 
+    /**
+     * Returns the effective max delivery count for this share partition, 
using the per-group dynamic
+     * config if available, otherwise the broker default.
+     */
+    int maxDeliveryCount() {
+        return deliveryCountLimitOrDefault(groupConfigManager, groupId, 
defaultMaxDeliveryCount);
+    }
+
+    /**
+     * Returns the throttle records delivery limit, computed as half of the 
effective max delivery
+     * count rounded up, with a minimum of {@link 
#MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT}.
+     */
+    private static int throttleRecordsDeliveryLimit(int maxDeliveryCount) {
+        return Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int) 
Math.ceil((double) maxDeliveryCount / 2));
+    }
+
+
+
     /**
      * The GapWindow class is used to record the gap start and end offset of 
the probable gaps
      * of available records which are neither known to Persister nor to 
SharePartition. Share Partition
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 08ce763b8ad..0b6239ec68b 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -727,6 +729,24 @@ public class ShareFetchUtilsTest {
         assertArrayEquals(input.toArray(), result.toArray());
     }
 
+    @Test
+    void testDeliveryCountLimitOrDefaultWithGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        GroupConfig groupConfig = mock(GroupConfig.class);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+
+        assertEquals(8, 
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group", 
5));
+    }
+
+    @Test
+    void testDeliveryCountLimitOrDefaultWithoutGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+
+        assertEquals(5, 
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group", 
5));
+    }
+
     private static class RecordsArgumentsProvider implements ArgumentsProvider 
{
         @Override
         public Stream<? extends Arguments> provideArguments(ExtensionContext 
context) throws Exception {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 00c080f695b..88b173c5239 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -12246,6 +12246,114 @@ public class SharePartitionTest {
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
     }
 
+    @Test
+    public void testMaxDeliveryCountUsesGroupConfigWhenPresent() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxDeliveryCount(5)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // maxDeliveryCount() should return the group config value, not the 
default.
+        assertEquals(8, sharePartition.maxDeliveryCount());
+    }
+
+    @Test
+    public void testMaxDeliveryCountFallsBackToDefaultWhenNoGroupConfig() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxDeliveryCount(5)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // maxDeliveryCount() should return the default value.
+        assertEquals(5, sharePartition.maxDeliveryCount());
+    }
+
+    @Test
+    public void testDynamicDeliveryCountDecreaseCausesArchival() {
+        // Start with a high default limit so records are not archived 
initially.
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxDeliveryCount(10)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(5, 10);
+
+        // Acquire and release: deliveryCount becomes 1, then released to 
AVAILABLE.
+        fetchAcquiredRecords(sharePartition, records, 10);
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        // Acquire again: deliveryCount becomes 2, state=ACQUIRED.
+        fetchAcquiredRecords(sharePartition, records, 10);
+
+        // Dynamically decrease the limit to 2 via group config BEFORE 
releasing.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(2);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        // Release: archival check fires because deliveryCount(2) >= 
maxDeliveryCount(2),
+        // so records transition to ARCHIVED instead of AVAILABLE.
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        // Next fetch should return 0 records since all records are archived.
+        fetchAcquiredRecords(sharePartition, records, 0);
+
+        // Records should have been archived - next fetch offset should have 
advanced.
+        assertEquals(15, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() {
+        // Start with limit = 2, records get archived after 2 deliveries.
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxDeliveryCount(2)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(5, 10);
+
+        // First acquire: deliveryCount = 1.
+        fetchAcquiredRecords(sharePartition, records, 10);
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        // Now increase limit to 10 via group config before the second acquire.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(10);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        // Second acquire: deliveryCount = 2. With old limit (2) this would 
archive.
+        // With new limit (10) it should stay acquirable.
+        fetchAcquiredRecords(sharePartition, records, 10);
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        // Third acquire should still work since limit is now 10.
+        fetchAcquiredRecords(sharePartition, records, 10);
+
+        // Records are still in the cached state, not archived.
+        assertFalse(sharePartition.cachedState().isEmpty());
+    }
+
     private static class SharePartitionBuilder {
 
         private int defaultAcquisitionLockTimeoutMs = 30000;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fad3681091a..7ac893f378d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -359,6 +359,7 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
+    cgConfigs.put(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT.toString)
     cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, 
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
     cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG, 
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
     cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index a012d1469b2..bff700cd73d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -52,6 +52,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = 
"share.record.lock.duration.ms";
 
+    public static final String SHARE_DELIVERY_COUNT_LIMIT_CONFIG = 
"share.delivery.count.limit";
+
     public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = 
"share.auto.offset.reset";
     public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = 
ShareGroupAutoOffsetResetStrategy.LATEST.name();
     public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to 
initialize the share-partition start offset. " +
@@ -87,6 +89,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public final int shareRecordLockDurationMs;
 
+    public final int shareDeliveryCountLimit;
+
     public final String shareAutoOffsetReset;
 
     public final int streamsSessionTimeoutMs;
@@ -130,6 +134,12 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(1000),
             MEDIUM,
             ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
+        .define(SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+            INT,
+            ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT,
+            atLeast(2),
+            MEDIUM,
+            ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
         .define(SHARE_AUTO_OFFSET_RESET_CONFIG,
             STRING,
             SHARE_AUTO_OFFSET_RESET_DEFAULT,
@@ -174,6 +184,7 @@ public final class GroupConfig extends AbstractConfig {
         this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
         this.shareHeartbeatIntervalMs = 
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareRecordLockDurationMs = 
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+        this.shareDeliveryCountLimit = 
getInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
         this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
         this.streamsSessionTimeoutMs = 
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
         this.streamsHeartbeatIntervalMs = 
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -216,6 +227,7 @@ public final class GroupConfig extends AbstractConfig {
         int shareHeartbeatInterval = (Integer) 
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         int shareSessionTimeout = (Integer) 
valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
         int shareRecordLockDurationMs = (Integer) 
valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+        int shareDeliveryCountLimit = (Integer) 
valueMaps.get(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
         int streamsSessionTimeoutMs = (Integer) 
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
         int streamsHeartbeatIntervalMs = (Integer) 
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         int streamsNumStandbyReplicas = (Integer) 
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
@@ -259,6 +271,14 @@ public final class GroupConfig extends AbstractConfig {
             throw new 
InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be 
less than or equal to " +
                 
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
         }
+        if (shareDeliveryCountLimit < 
shareGroupConfig.shareGroupMinDeliveryCountLimit()) {
+            throw new 
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be 
greater than or equal to " +
+                    
ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
+        }
+        if (shareDeliveryCountLimit > 
shareGroupConfig.shareGroupMaxDeliveryCountLimit()) {
+            throw new 
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be 
less than or equal to " +
+                    
ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
+        }
         if (streamsHeartbeatIntervalMs < 
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
             throw new 
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
                 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -355,6 +375,10 @@ public final class GroupConfig extends AbstractConfig {
         return shareHeartbeatIntervalMs;
     }
 
+    public int shareDeliveryCountLimit() {
+        return shareDeliveryCountLimit;
+    }
+
     /**
      * The share group record lock duration milliseconds.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 45e174d1a59..761a2dcb7fc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -47,17 +47,25 @@ public class ShareGroupConfig {
     public static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT = 5;
     public static final String SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC = "The 
maximum number of delivery attempts for a record delivered to a share group.";
 
+    public static final String SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG = 
"group.share.max.delivery.count.limit";
+    public static final int SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT = 10;
+    public static final String SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC = "The 
maximum value of a group configuration for the maximum number of delivery 
attempts for a record delivered to a share group.";
+
+    public static final String SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG = 
"group.share.min.delivery.count.limit";
+    public static final int SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT = 2;
+    public static final String SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DOC = "The 
minimum value of a group configuration for the maximum number of delivery 
attempts for a record delivered to a share group.";
+
     public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG = 
"group.share.record.lock.duration.ms";
     public static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT = 
30000;
     public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC = "The 
record acquisition lock duration in milliseconds for share groups.";
 
     public static final String SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG 
= "group.share.min.record.lock.duration.ms";
     public static final int SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT = 
15000;
-    public static final String SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC = 
"The record acquisition lock minimum duration in milliseconds for share 
groups.";
+    public static final String SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC = 
"The minimum value of a group configuration for record acquisition lock 
duration in milliseconds.";
 
     public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG 
= "group.share.max.record.lock.duration.ms";
     public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 
60000;
-    public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = 
"The record acquisition lock maximum duration in milliseconds for share 
groups.";
+    public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = 
"The maximum value of a group configuration for record acquisition lock 
duration in milliseconds.";
 
     public static final String 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG = 
"share.fetch.purgatory.purge.interval.requests";
     public static final int 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT = 1000;
@@ -75,6 +83,8 @@ public class ShareGroupConfig {
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, 
SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
             .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
+            .define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM, 
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
+            .define(SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 5), MEDIUM, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DOC)
             .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, 
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 3600000), MEDIUM, 
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
             .define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
             .define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, 
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), 
MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
@@ -86,6 +96,8 @@ public class ShareGroupConfig {
     private final boolean isShareGroupEnabled;
     private final int shareGroupPartitionMaxRecordLocks;
     private final int shareGroupDeliveryCountLimit;
+    private final int shareGroupMaxDeliveryCountLimit;
+    private final int shareGroupMinDeliveryCountLimit;
     private final int shareGroupRecordLockDurationMs;
     private final int shareGroupMaxRecordLockDurationMs;
     private final int shareGroupMinRecordLockDurationMs;
@@ -100,6 +112,8 @@ public class ShareGroupConfig {
         isShareGroupEnabled = 
config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG);
         shareGroupPartitionMaxRecordLocks = 
config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
         shareGroupDeliveryCountLimit = 
config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG);
+        shareGroupMaxDeliveryCountLimit = 
config.getInt(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
+        shareGroupMinDeliveryCountLimit = 
config.getInt(SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
         shareGroupRecordLockDurationMs = 
config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG);
         shareGroupMaxRecordLockDurationMs = 
config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
         shareGroupMinRecordLockDurationMs = 
config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
@@ -122,6 +136,14 @@ public class ShareGroupConfig {
         return shareGroupDeliveryCountLimit;
     }
 
+    public int shareGroupMaxDeliveryCountLimit() {
+        return shareGroupMaxDeliveryCountLimit;
+    }
+
+    public int shareGroupMinDeliveryCountLimit() {
+        return shareGroupMinDeliveryCountLimit;
+    }
+
     public int shareGroupRecordLockDurationMs() {
         return shareGroupRecordLockDurationMs;
     }
@@ -147,6 +169,12 @@ public class ShareGroupConfig {
     }
 
     private void validate() {
+        Utils.require(shareGroupMaxDeliveryCountLimit >= 
shareGroupDeliveryCountLimit,
+                String.format("%s must be greater than or equal to %s",
+                        SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG));
+        Utils.require(shareGroupDeliveryCountLimit >= 
shareGroupMinDeliveryCountLimit,
+                String.format("%s must be greater than or equal to %s",
+                        SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG));
         Utils.require(shareGroupRecordLockDurationMs >= 
shareGroupMinRecordLockDurationMs,
                 String.format("%s must be greater than or equal to %s",
                         SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG));
@@ -168,7 +196,8 @@ public class ShareGroupConfig {
         return Map.of(
             GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
             GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs(),
-            GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupRecordLockDurationMs()
+            GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupRecordLockDurationMs(),
+            GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
shareGroupDeliveryCountLimit()
         );
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index a6837926818..6f5e1cf1f13 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -41,6 +41,8 @@ public class GroupConfigTest {
     private static final boolean SHARE_GROUP_ENABLE = true;
     private static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS = 200;
     private static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT = 5;
+    private static final int SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT = 2;
+    private static final int SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT = 10;
     private static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS = 30000;
     private static final int SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS = 15000;
     private static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS = 60000;
@@ -171,6 +173,16 @@ public class GroupConfigTest {
         doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
+        // Check for invalid shareDeliveryCountLimit, < MIN
+        props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "1");
+        doTestInvalidProps(props, ConfigException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid shareDeliveryCountLimit, > MAX
+        props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "11");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
         // Check for invalid shareAutoOffsetReset
         props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
         doTestInvalidProps(props, ConfigException.class);
@@ -234,6 +246,7 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
"2000");
+        defaultValue.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "2");
         defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
         defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
         defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
@@ -250,6 +263,7 @@ public class GroupConfigTest {
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
+        assertEquals(2, 
config.getInt(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG));
         assertEquals("latest", 
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
         assertEquals("read_uncommitted", 
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
@@ -272,6 +286,7 @@ public class GroupConfigTest {
         props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
         props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
         props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
+        props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "5");
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
         props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
         props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
@@ -287,7 +302,8 @@ public class GroupConfigTest {
     }
 
     private ShareGroupConfig createShareGroupConfig() {
-        return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE, 
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS, SHARE_GROUP_DELIVERY_COUNT_LIMIT,
+        return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE, 
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS,
+                SHARE_GROUP_DELIVERY_COUNT_LIMIT, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT,
             SHARE_GROUP_RECORD_LOCK_DURATION_MS, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS, 
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 73c0c0cc10e..b9969c09b63 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -39,6 +39,8 @@ public class ShareGroupConfigTest {
         configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, true);
         
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
200);
         configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 
5);
+        
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, 2);
+        
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, 9);
         
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 30000);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 
15000);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 
60000);
@@ -75,6 +77,20 @@ public class ShareGroupConfigTest {
         assertEquals("group.share.max.record.lock.duration.ms must be greater 
than or equal to group.share.record.lock.duration.ms",
             assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
 
+        configs.clear();
+        // test for when SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG is less than 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG
+        
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, 3);
+        configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 
2);
+        assertEquals("group.share.delivery.count.limit must be greater than or 
equal to group.share.min.delivery.count.limit",
+                assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        configs.clear();
+        // test for when SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG is greater 
than SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG
+        
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, 9);
+        configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 
10);
+        assertEquals("group.share.max.delivery.count.limit must be greater 
than or equal to group.share.delivery.count.limit",
+                assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
         configs.clear();
         // test for when SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG is out of 
bounds
         configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 
1);
@@ -111,6 +127,8 @@ public class ShareGroupConfigTest {
         boolean shareGroupEnable,
         int shareGroupPartitionMaxRecordLocks,
         int shareGroupDeliveryCountLimit,
+        int shareGroupMinDeliveryCountLimit,
+        int shareGroupMaxDeliveryCountLimit,
         int shareGroupRecordLockDurationsMs,
         int shareGroupMinRecordLockDurationMs,
         int shareGroupMaxRecordLockDurationMs
@@ -119,6 +137,8 @@ public class ShareGroupConfigTest {
         configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, 
shareGroupEnable);
         
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
shareGroupPartitionMaxRecordLocks);
         configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, 
shareGroupDeliveryCountLimit);
+        
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, 
shareGroupMinDeliveryCountLimit);
+        
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, 
shareGroupMaxDeliveryCountLimit);
         
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupRecordLockDurationsMs);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupMinRecordLockDurationMs);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupMaxRecordLockDurationMs);

Reply via email to