This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a670e2b207a KAFKA-20159: Group configuration for
share.delivery.count.limit (#21491)
a670e2b207a is described below
commit a670e2b207a7b29f6faa93b998c303c5cbdc5b2c
Author: TaiJuWu <[email protected]>
AuthorDate: Sat Feb 21 05:47:19 2026 +0800
KAFKA-20159: Group configuration for share.delivery.count.limit (#21491)
Make `share.delivery.count.limit` become dynamic config, we can change
the value at run time and set to specific group.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 126 +++++++++++++++++++++
.../java/kafka/server/share/ShareFetchUtils.java | 16 +++
.../java/kafka/server/share/SharePartition.java | 53 ++++++---
.../kafka/server/share/ShareFetchUtilsTest.java | 20 ++++
.../kafka/server/share/SharePartitionTest.java | 108 ++++++++++++++++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../kafka/coordinator/group/GroupConfig.java | 24 ++++
.../group/modern/share/ShareGroupConfig.java | 35 +++++-
.../kafka/coordinator/group/GroupConfigTest.java | 18 ++-
.../group/modern/share/ShareGroupConfigTest.java | 20 ++++
10 files changed, 400 insertions(+), 23 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index b67202e29fb..e3de0ea3442 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -3888,6 +3888,119 @@ public class ShareConsumerTest {
}
}
+ @ClusterTest
+ public void testDynamicDeliveryCountLimitDecreaseArchivesRecords() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ // Produce 1 record.
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(),
+ null, "key".getBytes(), "value".getBytes());
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ // Consume and release twice (deliveryCount becomes 2).
+ // Delivery 1: acquire (deliveryCount=1) → release (1 < default
limit 5 → AVAILABLE).
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Delivery 2: acquire (deliveryCount=2) → release (2 < 5 →
AVAILABLE).
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ assertEquals((short) 2,
records.records(tp).get(0).deliveryCount().get());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Dynamically decrease delivery count limit to 3 via group config.
+ alterShareDeliveryCountLimit("group1", "3");
+
+ // Delivery 3: acquire (deliveryCount=3) → release (3 >= 3 →
ARCHIVED).
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ assertEquals((short) 3,
records.records(tp).get(0).deliveryCount().get());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Next poll should have no records because the record was
archived.
+ records = shareConsumer.poll(Duration.ofMillis(2500L));
+ assertTrue(records.isEmpty(),
+ "Records should be empty as the record was archived. But
received: " + records.count());
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key = "group.share.delivery.count.limit",
value = "3"),
+ }
+ )
+ public void testDynamicDeliveryCountLimitIncreaseAllowsMoreDeliveries() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ // Produce 1 record.
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(),
+ null, "key".getBytes(), "value".getBytes());
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ // Consume and release twice (deliveryCount becomes 2).
+ // Delivery 1: acquire (deliveryCount=1) → release (1 < broker
limit 3 → AVAILABLE).
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Delivery 2: acquire (deliveryCount=2) → release (2 < 3 →
AVAILABLE).
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ assertEquals((short) 2,
records.records(tp).get(0).deliveryCount().get());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Dynamically increase delivery count limit to 5 via group config.
+ alterShareDeliveryCountLimit("group1", "5");
+
+ // Delivery 3: acquire (deliveryCount=3) → release (3 < 5 →
AVAILABLE).
+ // Without the config increase, 3 >= 3 would have caused archival.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ assertEquals((short) 3,
records.records(tp).get(0).deliveryCount().get());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Delivery 4: acquire (deliveryCount=4) → release (4 < 5 →
AVAILABLE).
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ assertEquals((short) 4,
records.records(tp).get(0).deliveryCount().get());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Delivery 5: acquire (deliveryCount=5) → release (5 >= 5 →
ARCHIVED).
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ assertEquals((short) 5,
records.records(tp).get(0).deliveryCount().get());
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Next poll should have no records because the record was
archived.
+ records = shareConsumer.poll(Duration.ofMillis(2500L));
+ assertTrue(records.isEmpty(),
+ "Records should be empty as the record was archived. But
received: " + records.count());
+ }
+ }
+
/**
* Util class to encapsulate state for a consumer/producer
* being executed by an {@link ExecutorService}.
@@ -4186,6 +4299,19 @@ public class ShareConsumerTest {
}
}
+ private void alterShareDeliveryCountLimit(String groupId, String newValue)
{
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
+ alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
+ GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, newValue),
AlterConfigOp.OpType.SET)));
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+ try (Admin adminClient = createAdminClient()) {
+ assertDoesNotThrow(() ->
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+ .all()
+ .get(60, TimeUnit.SECONDS), "Failed to alter configs");
+ }
+ }
+
private void alterShareIsolationLevel(String groupId, String newValue) {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index deb6e7052d5..e16a2a43a63 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -275,6 +275,22 @@ public class ShareFetchUtils {
return defaultValue;
}
+ /**
+ * The method is used to get the delivery count limit for the group. If
the group config is present,
+ * then the delivery count limit is returned. Otherwise, the default value
is returned.
+ *
+ * @param groupConfigManager The group config manager.
+ * @param groupId The group id for which the delivery count limit is to be
fetched.
+ * @param defaultValue The default value to be returned if the group
config is not present.
+ * @return The delivery count limit for the group.
+ */
+ public static int deliveryCountLimitOrDefault(GroupConfigManager
groupConfigManager, String groupId, int defaultValue) {
+ if (groupConfigManager.groupConfig(groupId).isPresent()) {
+ return
groupConfigManager.groupConfig(groupId).get().shareDeliveryCountLimit();
+ }
+ return defaultValue;
+ }
+
/**
* Merges contiguous AcquiredRecords with the same delivery count into
single records.
* <p>
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 3df03e9853e..fad0618460b 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -91,6 +91,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static kafka.server.share.ShareFetchUtils.deliveryCountLimitOrDefault;
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
@@ -200,17 +201,13 @@ public class SharePartition {
private final int maxInFlightRecords;
/**
+ * This is the default value which is used unless the group has a
configuration which overrides it.
* The max delivery count is used to limit the number of times a record
can be delivered to the
* consumer. The max delivery count is used to prevent the consumer
re-delivering the same record
* indefinitely.
*/
- private final int maxDeliveryCount;
+ private final int defaultMaxDeliveryCount;
- /**
- * Records whose delivery count exceeds this are deemed abnormal and the
batching of these records
- * should be reduced. The limit is set to half of maxDeliveryCount rounded
up, with a minimum of 2.
- */
- private final int throttleRecordsDeliveryLimit;
/**
* The group config manager is used to retrieve the values for dynamic
group configurations
*/
@@ -335,7 +332,7 @@ public class SharePartition {
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightRecords,
- int maxDeliveryCount,
+ int defaultMaxDeliveryCount,
int defaultRecordLockDurationMs,
Timer timer,
Time time,
@@ -344,7 +341,7 @@ public class SharePartition {
GroupConfigManager groupConfigManager,
SharePartitionListener listener
) {
- this(groupId, topicIdPartition, leaderEpoch, maxInFlightRecords,
maxDeliveryCount, defaultRecordLockDurationMs,
+ this(groupId, topicIdPartition, leaderEpoch, maxInFlightRecords,
defaultMaxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, groupConfigManager,
SharePartitionState.EMPTY, listener,
new SharePartitionMetrics(groupId, topicIdPartition.topic(),
topicIdPartition.partition()));
}
@@ -356,7 +353,7 @@ public class SharePartition {
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightRecords,
- int maxDeliveryCount,
+ int defaultMaxDeliveryCount,
int defaultRecordLockDurationMs,
Timer timer,
Time time,
@@ -371,8 +368,7 @@ public class SharePartition {
this.topicIdPartition = topicIdPartition;
this.leaderEpoch = leaderEpoch;
this.maxInFlightRecords = maxInFlightRecords;
- this.maxDeliveryCount = maxDeliveryCount;
- this.throttleRecordsDeliveryLimit =
Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int) Math.ceil((double)
maxDeliveryCount / 2));
+ this.defaultMaxDeliveryCount = defaultMaxDeliveryCount;
this.cachedState = new ConcurrentSkipListMap<>();
this.lock = new ReentrantReadWriteLock();
this.findNextFetchOffset = false;
@@ -927,7 +923,7 @@ public class SharePartition {
continue;
}
- InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount, memberId);
+ InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId);
if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.info("Unable to acquire records for the batch: {} in
share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
@@ -1117,7 +1113,7 @@ public class SharePartition {
InFlightState updateResult =
offsetState.getValue().startStateTransition(
offsetState.getKey() < startOffset ?
RecordState.ARCHIVED : recordState,
DeliveryCountOps.NO_OP,
- this.maxDeliveryCount,
+ this.maxDeliveryCount(),
EMPTY_MEMBER_ID
);
if (updateResult == null) {
@@ -1159,7 +1155,7 @@ public class SharePartition {
InFlightState updateResult =
inFlightBatch.startBatchStateTransition(
inFlightBatch.lastOffset() < startOffset ?
RecordState.ARCHIVED : recordState,
DeliveryCountOps.NO_OP,
- this.maxDeliveryCount,
+ this.maxDeliveryCount(),
EMPTY_MEMBER_ID
);
if (updateResult == null) {
@@ -1924,6 +1920,8 @@ public class SharePartition {
int acquiredCount = 0;
long maxFetchRecordsWhileThrottledRecords = -1;
boolean hasThrottledRecord = false;
+ int maxDeliveryCount = maxDeliveryCount();
+ int throttleRecordsDeliveryLimit =
throttleRecordsDeliveryLimit(maxDeliveryCount);
List<AcquiredRecords> offsetAcquiredRecords = new ArrayList<>();
lock.writeLock().lock();
try {
@@ -2056,6 +2054,7 @@ public class SharePartition {
* @return True if the batch should be throttled (delivery count >=
threshold), false otherwise.
*/
private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch,
long requestFirstOffset, long requestLastOffset) {
+ int throttleRecordsDeliveryLimit =
throttleRecordsDeliveryLimit(maxDeliveryCount());
if (inFlightBatch.offsetState() == null) {
// If offsetState is null, it means the batch is not split and
represents a single batch.
// Check if the batch is in AVAILABLE state and has no ongoing
transition.
@@ -2322,7 +2321,7 @@ public class SharePartition {
InFlightState updateResult =
offsetState.getValue().startStateTransition(
recordState,
DeliveryCountOps.NO_OP,
- this.maxDeliveryCount,
+ this.maxDeliveryCount(),
EMPTY_MEMBER_ID
);
@@ -2396,7 +2395,7 @@ public class SharePartition {
InFlightState updateResult =
inFlightBatch.startBatchStateTransition(
recordState,
DeliveryCountOps.NO_OP,
- this.maxDeliveryCount,
+ this.maxDeliveryCount(),
EMPTY_MEMBER_ID
);
if (updateResult == null) {
@@ -2933,7 +2932,7 @@ public class SharePartition {
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(
inFlightBatch.lastOffset() < startOffset ?
RecordState.ARCHIVED : RecordState.AVAILABLE,
DeliveryCountOps.NO_OP,
- maxDeliveryCount,
+ maxDeliveryCount(),
EMPTY_MEMBER_ID);
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for
the batch: {}"
@@ -2987,7 +2986,7 @@ public class SharePartition {
InFlightState updateResult = offsetState.getValue().tryUpdateState(
offsetState.getKey() < startOffset ? RecordState.ARCHIVED
: RecordState.AVAILABLE,
DeliveryCountOps.NO_OP,
- maxDeliveryCount,
+ maxDeliveryCount(),
EMPTY_MEMBER_ID);
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for
the offset: {} in batch: {}"
@@ -3309,6 +3308,24 @@ public class SharePartition {
return deliveryCompleteCount.get();
}
+ /**
+ * Returns the effective max delivery count for this share partition,
using the per-group dynamic
+ * config if available, otherwise the broker default.
+ */
+ int maxDeliveryCount() {
+ return deliveryCountLimitOrDefault(groupConfigManager, groupId,
defaultMaxDeliveryCount);
+ }
+
+ /**
+ * Returns the throttle records delivery limit, computed as half of the
effective max delivery
+ * count rounded up, with a minimum of {@link
#MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT}.
+ */
+ private static int throttleRecordsDeliveryLimit(int maxDeliveryCount) {
+ return Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int)
Math.ceil((double) maxDeliveryCount / 2));
+ }
+
+
+
/**
* The GapWindow class is used to record the gap start and end offset of
the probable gaps
* of available records which are neither known to Persister nor to
SharePartition. Share Partition
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 08ce763b8ad..0b6239ec68b 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -727,6 +729,24 @@ public class ShareFetchUtilsTest {
assertArrayEquals(input.toArray(), result.toArray());
}
+ @Test
+ void testDeliveryCountLimitOrDefaultWithGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+ GroupConfig groupConfig = mock(GroupConfig.class);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+
+ assertEquals(8,
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group",
5));
+ }
+
+ @Test
+ void testDeliveryCountLimitOrDefaultWithoutGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+
+ assertEquals(5,
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group",
5));
+ }
+
private static class RecordsArgumentsProvider implements ArgumentsProvider
{
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext
context) throws Exception {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 00c080f695b..88b173c5239 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -12246,6 +12246,114 @@ public class SharePartitionTest {
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
}
+ @Test
+ public void testMaxDeliveryCountUsesGroupConfigWhenPresent() {
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withMaxDeliveryCount(5)
+ .withGroupConfigManager(groupConfigManager)
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ // maxDeliveryCount() should return the group config value, not the
default.
+ assertEquals(8, sharePartition.maxDeliveryCount());
+ }
+
+ @Test
+ public void testMaxDeliveryCountFallsBackToDefaultWhenNoGroupConfig() {
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withMaxDeliveryCount(5)
+ .withGroupConfigManager(groupConfigManager)
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ // maxDeliveryCount() should return the default value.
+ assertEquals(5, sharePartition.maxDeliveryCount());
+ }
+
+ @Test
+ public void testDynamicDeliveryCountDecreaseCausesArchival() {
+ // Start with a high default limit so records are not archived
initially.
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withMaxDeliveryCount(10)
+ .withGroupConfigManager(groupConfigManager)
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ MemoryRecords records = memoryRecords(5, 10);
+
+ // Acquire and release: deliveryCount becomes 1, then released to
AVAILABLE.
+ fetchAcquiredRecords(sharePartition, records, 10);
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
+
+ // Acquire again: deliveryCount becomes 2, state=ACQUIRED.
+ fetchAcquiredRecords(sharePartition, records, 10);
+
+ // Dynamically decrease the limit to 2 via group config BEFORE
releasing.
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(2);
+
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+ // Release: archival check fires because deliveryCount(2) >=
maxDeliveryCount(2),
+ // so records transition to ARCHIVED instead of AVAILABLE.
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
+
+ // Next fetch should return 0 records since all records are archived.
+ fetchAcquiredRecords(sharePartition, records, 0);
+
+ // Records should have been archived - next fetch offset should have
advanced.
+ assertEquals(15, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() {
+ // Start with limit = 2, records get archived after 2 deliveries.
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withMaxDeliveryCount(2)
+ .withGroupConfigManager(groupConfigManager)
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ MemoryRecords records = memoryRecords(5, 10);
+
+ // First acquire: deliveryCount = 1.
+ fetchAcquiredRecords(sharePartition, records, 10);
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
+
+ // Now increase limit to 10 via group config before the second acquire.
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(10);
+
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+ // Second acquire: deliveryCount = 2. With old limit (2) this would
archive.
+ // With new limit (10) it should stay acquirable.
+ fetchAcquiredRecords(sharePartition, records, 10);
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
+
+ // Third acquire should still work since limit is now 10.
+ fetchAcquiredRecords(sharePartition, records, 10);
+
+ // Records are still in the cached state, not archived.
+ assertFalse(sharePartition.cachedState().isEmpty());
+ }
+
private static class SharePartitionBuilder {
private int defaultAcquisitionLockTimeoutMs = 30000;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fad3681091a..7ac893f378d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -359,6 +359,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
+ cgConfigs.put(SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT.toString)
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG,
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index a012d1469b2..bff700cd73d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -52,6 +52,8 @@ public final class GroupConfig extends AbstractConfig {
public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG =
"share.record.lock.duration.ms";
+ public static final String SHARE_DELIVERY_COUNT_LIMIT_CONFIG =
"share.delivery.count.limit";
+
public static final String SHARE_AUTO_OFFSET_RESET_CONFIG =
"share.auto.offset.reset";
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT =
ShareGroupAutoOffsetResetStrategy.LATEST.name();
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to
initialize the share-partition start offset. " +
@@ -87,6 +89,8 @@ public final class GroupConfig extends AbstractConfig {
public final int shareRecordLockDurationMs;
+ public final int shareDeliveryCountLimit;
+
public final String shareAutoOffsetReset;
public final int streamsSessionTimeoutMs;
@@ -130,6 +134,12 @@ public final class GroupConfig extends AbstractConfig {
atLeast(1000),
MEDIUM,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
+ .define(SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+ INT,
+ ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT,
+ atLeast(2),
+ MEDIUM,
+ ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_AUTO_OFFSET_RESET_CONFIG,
STRING,
SHARE_AUTO_OFFSET_RESET_DEFAULT,
@@ -174,6 +184,7 @@ public final class GroupConfig extends AbstractConfig {
this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
this.shareHeartbeatIntervalMs =
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareRecordLockDurationMs =
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+ this.shareDeliveryCountLimit =
getInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
this.streamsSessionTimeoutMs =
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs =
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -216,6 +227,7 @@ public final class GroupConfig extends AbstractConfig {
int shareHeartbeatInterval = (Integer)
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
int shareSessionTimeout = (Integer)
valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
int shareRecordLockDurationMs = (Integer)
valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+ int shareDeliveryCountLimit = (Integer)
valueMaps.get(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
int streamsSessionTimeoutMs = (Integer)
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
int streamsHeartbeatIntervalMs = (Integer)
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
int streamsNumStandbyReplicas = (Integer)
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
@@ -259,6 +271,14 @@ public final class GroupConfig extends AbstractConfig {
throw new
InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be
less than or equal to " +
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
}
+ if (shareDeliveryCountLimit <
shareGroupConfig.shareGroupMinDeliveryCountLimit()) {
+ throw new
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be
greater than or equal to " +
+
ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
+ }
+ if (shareDeliveryCountLimit >
shareGroupConfig.shareGroupMaxDeliveryCountLimit()) {
+ throw new
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be
less than or equal to " +
+
ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
+ }
if (streamsHeartbeatIntervalMs <
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
throw new
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -355,6 +375,10 @@ public final class GroupConfig extends AbstractConfig {
return shareHeartbeatIntervalMs;
}
+ public int shareDeliveryCountLimit() {
+ return shareDeliveryCountLimit;
+ }
+
/**
* The share group record lock duration milliseconds.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 45e174d1a59..761a2dcb7fc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -47,17 +47,25 @@ public class ShareGroupConfig {
public static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT = 5;
public static final String SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC = "The
maximum number of delivery attempts for a record delivered to a share group.";
+ public static final String SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG =
"group.share.max.delivery.count.limit";
+ public static final int SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT = 10;
+ public static final String SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC = "The
maximum value of a group configuration for the maximum number of delivery
attempts for a record delivered to a share group.";
+
+ public static final String SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG =
"group.share.min.delivery.count.limit";
+ public static final int SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT = 2;
+ public static final String SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DOC = "The
minimum value of a group configuration for the maximum number of delivery
attempts for a record delivered to a share group.";
+
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG =
"group.share.record.lock.duration.ms";
public static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT =
30000;
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC = "The
record acquisition lock duration in milliseconds for share groups.";
public static final String SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG
= "group.share.min.record.lock.duration.ms";
public static final int SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT =
15000;
- public static final String SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC =
"The record acquisition lock minimum duration in milliseconds for share
groups.";
+ public static final String SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC =
"The minimum value of a group configuration for record acquisition lock
duration in milliseconds.";
public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG
= "group.share.max.record.lock.duration.ms";
public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT =
60000;
- public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC =
"The record acquisition lock maximum duration in milliseconds for share
groups.";
+ public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC =
"The maximum value of a group configuration for record acquisition lock
duration in milliseconds.";
public static final String
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG =
"share.fetch.purgatory.purge.interval.requests";
public static final int
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT = 1000;
@@ -75,6 +83,8 @@ public class ShareGroupConfig {
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN,
SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
+ .define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM,
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
+ .define(SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 5), MEDIUM,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 3600000), MEDIUM,
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000),
MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
@@ -86,6 +96,8 @@ public class ShareGroupConfig {
private final boolean isShareGroupEnabled;
private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupDeliveryCountLimit;
+ private final int shareGroupMaxDeliveryCountLimit;
+ private final int shareGroupMinDeliveryCountLimit;
private final int shareGroupRecordLockDurationMs;
private final int shareGroupMaxRecordLockDurationMs;
private final int shareGroupMinRecordLockDurationMs;
@@ -100,6 +112,8 @@ public class ShareGroupConfig {
isShareGroupEnabled =
config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG);
shareGroupPartitionMaxRecordLocks =
config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
shareGroupDeliveryCountLimit =
config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG);
+ shareGroupMaxDeliveryCountLimit =
config.getInt(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
+ shareGroupMinDeliveryCountLimit =
config.getInt(SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
shareGroupRecordLockDurationMs =
config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMaxRecordLockDurationMs =
config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMinRecordLockDurationMs =
config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
@@ -122,6 +136,14 @@ public class ShareGroupConfig {
return shareGroupDeliveryCountLimit;
}
+ public int shareGroupMaxDeliveryCountLimit() {
+ return shareGroupMaxDeliveryCountLimit;
+ }
+
+ public int shareGroupMinDeliveryCountLimit() {
+ return shareGroupMinDeliveryCountLimit;
+ }
+
public int shareGroupRecordLockDurationMs() {
return shareGroupRecordLockDurationMs;
}
@@ -147,6 +169,12 @@ public class ShareGroupConfig {
}
private void validate() {
+ Utils.require(shareGroupMaxDeliveryCountLimit >=
shareGroupDeliveryCountLimit,
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG));
+ Utils.require(shareGroupDeliveryCountLimit >=
shareGroupMinDeliveryCountLimit,
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG));
Utils.require(shareGroupRecordLockDurationMs >=
shareGroupMinRecordLockDurationMs,
String.format("%s must be greater than or equal to %s",
SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG));
@@ -168,7 +196,8 @@ public class ShareGroupConfig {
return Map.of(
GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs(),
- GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupRecordLockDurationMs()
+ GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupRecordLockDurationMs(),
+ GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupDeliveryCountLimit()
);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index a6837926818..6f5e1cf1f13 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -41,6 +41,8 @@ public class GroupConfigTest {
private static final boolean SHARE_GROUP_ENABLE = true;
private static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS = 200;
private static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT = 5;
+ private static final int SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT = 2;
+ private static final int SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT = 10;
private static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS = 30000;
private static final int SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS = 15000;
private static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS = 60000;
@@ -171,6 +173,16 @@ public class GroupConfigTest {
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
+ // Check for invalid shareDeliveryCountLimit, < MIN
+ props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "1");
+ doTestInvalidProps(props, ConfigException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid shareDeliveryCountLimit, > MAX
+ props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "11");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
// Check for invalid shareAutoOffsetReset
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
doTestInvalidProps(props, ConfigException.class);
@@ -234,6 +246,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
"2000");
+ defaultValue.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "2");
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
"10");
@@ -250,6 +263,7 @@ public class GroupConfigTest {
assertEquals(10,
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
+ assertEquals(2,
config.getInt(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG));
assertEquals("latest",
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
assertEquals("read_uncommitted",
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
@@ -272,6 +286,7 @@ public class GroupConfigTest {
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
+ props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "5");
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
@@ -287,7 +302,8 @@ public class GroupConfigTest {
}
private ShareGroupConfig createShareGroupConfig() {
- return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE,
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS, SHARE_GROUP_DELIVERY_COUNT_LIMIT,
+ return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE,
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS,
+ SHARE_GROUP_DELIVERY_COUNT_LIMIT,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT,
SHARE_GROUP_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 73c0c0cc10e..b9969c09b63 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -39,6 +39,8 @@ public class ShareGroupConfigTest {
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, true);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG,
200);
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
5);
+
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, 2);
+
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, 9);
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 30000);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
15000);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG,
60000);
@@ -75,6 +77,20 @@ public class ShareGroupConfigTest {
assertEquals("group.share.max.record.lock.duration.ms must be greater
than or equal to group.share.record.lock.duration.ms",
assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+ configs.clear();
+ // test for when SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG is less than
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG
+
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, 3);
+ configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
2);
+ assertEquals("group.share.delivery.count.limit must be greater than or
equal to group.share.min.delivery.count.limit",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ configs.clear();
+ // test for when SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG is greater
than SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG
+
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, 9);
+ configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
10);
+ assertEquals("group.share.max.delivery.count.limit must be greater
than or equal to group.share.delivery.count.limit",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
configs.clear();
// test for when SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG is out of
bounds
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
1);
@@ -111,6 +127,8 @@ public class ShareGroupConfigTest {
boolean shareGroupEnable,
int shareGroupPartitionMaxRecordLocks,
int shareGroupDeliveryCountLimit,
+ int shareGroupMinDeliveryCountLimit,
+ int shareGroupMaxDeliveryCountLimit,
int shareGroupRecordLockDurationsMs,
int shareGroupMinRecordLockDurationMs,
int shareGroupMaxRecordLockDurationMs
@@ -119,6 +137,8 @@ public class ShareGroupConfigTest {
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG,
shareGroupEnable);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupPartitionMaxRecordLocks);
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupDeliveryCountLimit);
+
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupMinDeliveryCountLimit);
+
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupMaxDeliveryCountLimit);
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupRecordLockDurationsMs);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupMinRecordLockDurationMs);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupMaxRecordLockDurationMs);