This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8dbf56e4b5b KAFKA-17541:[1/2] Improve handling of delivery count
(#19430)
8dbf56e4b5b is described below
commit 8dbf56e4b5b295116d14cb0eae8cc1a741d48440
Author: Lan Ding <[email protected]>
AuthorDate: Thu May 1 21:40:03 2025 +0800
KAFKA-17541:[1/2] Improve handling of delivery count (#19430)
For records which are automatically released as a result of closing a
share session normally, the delivery count should not be incremented.
These records were fetched but they were not actually delivered to the
client since the disposition of the delivery records is carried in the
ShareAcknowledge which closes the share session. Any remaining records
were not delivered, only fetched.
This PR releases the delivery count for records when closing a share
session normally.
Co-authored-by: d00791190 <[email protected]>
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 58 ++++++++-
.../java/kafka/server/share/SharePartition.java | 62 ++++++----
.../kafka/server/share/SharePartitionTest.java | 130 ++++++++++++++-------
3 files changed, 186 insertions(+), 64 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 51ec647fd7c..edb6d56215e 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -1709,11 +1709,9 @@ public class ShareConsumerTest {
public void testShareAutoOffsetResetEarliestAfterLsoMovement() {
alterShareAutoOffsetReset("group1", "earliest");
try (
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
Producer<byte[], byte[]> producer = createProducer();
Admin adminClient = createAdminClient()
) {
- shareConsumer.subscribe(Set.of(tp.topic()));
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
// We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
@@ -1993,6 +1991,62 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest
+ public void testDeliveryCountNotIncreaseAfterSessionClose() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ // We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
+ for (int i = 0; i < 10; i++) {
+ assertDoesNotThrow(() -> producer.send(record).get(), "Failed
to send records");
+ }
+ }
+
+ // Perform the fetch, close in a loop.
+ for (int count = 0; count <
ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT; count++) {
+ consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, false);
+ }
+
+ // If the delivery count is increased, consumer will get nothing.
+ int consumedMessageCount = consumeMessages(new AtomicInteger(0), 10,
"group1", 1, 10, true);
+ // The records returned belong to offsets 0-9.
+ assertEquals(10, consumedMessageCount);
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void
testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAcknowledgement()
{
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))) {
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null,
+ "key".getBytes(), "value".getBytes());
+ producer.send(record);
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 2);
+ assertEquals(2, records.count());
+ // Acknowledge the first record with AcknowledgeType.RELEASE
+ shareConsumer.acknowledge(records.records(tp).get(0),
AcknowledgeType.RELEASE);
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ }
+
+ // Test delivery count
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1", Map.of())) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 2);
+ assertEquals(2, records.count());
+ assertEquals((short) 2,
records.records(tp).get(0).deliveryCount().get());
+ assertEquals((short) 1,
records.records(tp).get(1).deliveryCount().get());
+ }
+ }
+
@ClusterTest(
brokers = 3,
serverProperties = {
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index db2ff4fc8cf..fa80c230a58 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -197,6 +197,16 @@ public class SharePartition {
}
}
+ /**
+ * The DeliveryCountOps is used to specify the behavior on the delivery
count: increase, decrease,
+ * or do nothing.
+ */
+ private enum DeliveryCountOps {
+ INCREASE,
+ DECREASE,
+ NO_OP
+ }
+
/**
* The group id of the share partition belongs to.
*/
@@ -832,7 +842,7 @@ public class SharePartition {
continue;
}
- InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, maxDeliveryCount,
memberId);
+ InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount, memberId);
if (updateResult == null) {
log.info("Unable to acquire records for the batch: {} in
share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
@@ -1024,9 +1034,10 @@ public class SharePartition {
return Optional.empty();
}
if (offsetState.getValue().state == RecordState.ACQUIRED) {
+ // These records were fetched but they were not actually
delivered to the client.
InFlightState updateResult =
offsetState.getValue().startStateTransition(
offsetState.getKey() < startOffset ?
RecordState.ARCHIVED : recordState,
- false,
+ DeliveryCountOps.DECREASE,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
@@ -1072,7 +1083,7 @@ public class SharePartition {
if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
InFlightState updateResult =
inFlightBatch.startBatchStateTransition(
inFlightBatch.lastOffset() < startOffset ?
RecordState.ARCHIVED : recordState,
- false,
+ DeliveryCountOps.DECREASE,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
@@ -1624,8 +1635,8 @@ public class SharePartition {
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true,
maxDeliveryCount,
- memberId);
+ InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ maxDeliveryCount, memberId);
if (updateResult == null) {
log.trace("Unable to acquire records for the offset: {} in
batch: {}"
+ " for the share partition: {}-{}",
offsetState.getKey(), inFlightBatch,
@@ -1897,7 +1908,7 @@ public class SharePartition {
recordStateDefault;
InFlightState updateResult =
offsetState.getValue().startStateTransition(
recordState,
- false,
+ DeliveryCountOps.NO_OP,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
@@ -1950,7 +1961,7 @@ public class SharePartition {
// is only important when the batch is acquired.
InFlightState updateResult =
inFlightBatch.startBatchStateTransition(
recordState,
- false,
+ DeliveryCountOps.NO_OP,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
@@ -2409,7 +2420,7 @@ public class SharePartition {
if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(
inFlightBatch.lastOffset() < startOffset ?
RecordState.ARCHIVED : RecordState.AVAILABLE,
- false,
+ DeliveryCountOps.NO_OP,
maxDeliveryCount,
EMPTY_MEMBER_ID);
if (updateResult == null) {
@@ -2455,7 +2466,7 @@ public class SharePartition {
}
InFlightState updateResult = offsetState.getValue().tryUpdateState(
offsetState.getKey() < startOffset ? RecordState.ARCHIVED
: RecordState.AVAILABLE,
- false,
+ DeliveryCountOps.NO_OP,
maxDeliveryCount,
EMPTY_MEMBER_ID);
if (updateResult == null) {
@@ -2875,19 +2886,19 @@ public class SharePartition {
inFlightState().archive(newMemberId);
}
- private InFlightState tryUpdateBatchState(RecordState newState,
boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+ private InFlightState tryUpdateBatchState(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
if (batchState == null) {
throw new IllegalStateException("The batch state update is not
available as the offset state is maintained");
}
- return batchState.tryUpdateState(newState, incrementDeliveryCount,
maxDeliveryCount, newMemberId);
+ return batchState.tryUpdateState(newState, ops, maxDeliveryCount,
newMemberId);
}
- private InFlightState startBatchStateTransition(RecordState newState,
boolean incrementDeliveryCount, int maxDeliveryCount,
+ private InFlightState startBatchStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount,
String newMemberId) {
if (batchState == null) {
throw new IllegalStateException("The batch state update is not
available as the offset state is maintained");
}
- return batchState.startStateTransition(newState,
incrementDeliveryCount, maxDeliveryCount, newMemberId);
+ return batchState.startStateTransition(newState, ops,
maxDeliveryCount, newMemberId);
}
private void maybeInitializeOffsetStateUpdate() {
@@ -2999,23 +3010,23 @@ public class SharePartition {
/**
* Try to update the state of the records. The state of the records
can only be updated if the
- * new state is allowed to be transitioned from old state. The
delivery count is not incremented
+ * new state is allowed to be transitioned from old state. The
delivery count is not changed
* if the state update is unsuccessful.
*
* @param newState The new state of the records.
- * @param incrementDeliveryCount Whether to increment the delivery
count.
+ * @param ops The behavior on the delivery count.
*
* @return {@code InFlightState} if update succeeds, null otherwise.
Returning state
* helps update chaining.
*/
- private InFlightState tryUpdateState(RecordState newState, boolean
incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+ private InFlightState tryUpdateState(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
try {
- if (newState == RecordState.AVAILABLE && deliveryCount >=
maxDeliveryCount) {
+ if (newState == RecordState.AVAILABLE && ops !=
DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
newState = RecordState.ARCHIVED;
}
state = state.validateTransition(newState);
- if (incrementDeliveryCount && newState !=
RecordState.ARCHIVED) {
- deliveryCount++;
+ if (newState != RecordState.ARCHIVED) {
+ deliveryCount = updatedDeliveryCount(ops);
}
memberId = newMemberId;
return this;
@@ -3025,14 +3036,23 @@ public class SharePartition {
}
}
+ private int updatedDeliveryCount(DeliveryCountOps ops) {
+ return switch (ops) {
+ case INCREASE -> deliveryCount + 1;
+ case DECREASE -> deliveryCount - 1;
+ // do nothing
+ case NO_OP -> deliveryCount;
+ };
+ }
+
private void archive(String newMemberId) {
state = RecordState.ARCHIVED;
memberId = newMemberId;
}
- private InFlightState startStateTransition(RecordState newState,
boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+ private InFlightState startStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
rollbackState = new InFlightState(state, deliveryCount, memberId,
acquisitionLockTimeoutTask);
- return tryUpdateState(newState, incrementDeliveryCount,
maxDeliveryCount, newMemberId);
+ return tryUpdateState(newState, ops, maxDeliveryCount,
newMemberId);
}
private void completeStateTransition(boolean commit) {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 64781648774..465bce6de6a 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -3652,7 +3652,8 @@ public class SharePartitionTest {
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).batchState());
- assertEquals(1,
sharePartition.cachedState().get(0L).batchDeliveryCount());
+ // Release delivery count.
+ assertEquals(0,
sharePartition.cachedState().get(0L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(0L).offsetState());
}
@@ -3669,7 +3670,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
- assertEquals(1,
sharePartition.cachedState().get(5L).batchDeliveryCount());
+ assertEquals(0,
sharePartition.cachedState().get(5L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(5L).offsetState());
}
@@ -3731,7 +3732,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
- expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
@@ -3745,8 +3746,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
}
@@ -3792,8 +3793,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
// Release acquired records for "member-2".
@@ -3815,8 +3816,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
}
@@ -3862,8 +3863,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
// Ack subset of records by "member-2".
@@ -3879,7 +3880,7 @@ public class SharePartitionTest {
// Check cached state.
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
@@ -3891,8 +3892,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
}
@@ -3926,14 +3927,14 @@ public class SharePartitionTest {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(7L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(7L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(8L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(9L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
}
@Test
- public void
testMaxDeliveryCountLimitExceededForRecordsSubsetAfterReleaseAcquiredRecords() {
+ public void
testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcquiredRecords()
{
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
@@ -3955,12 +3956,12 @@ public class SharePartitionTest {
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
- assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
assertNull(sharePartition.cachedState().get(10L).offsetState());
}
@Test
- public void
testMaxDeliveryCountLimitExceededForRecordsSubsetAfterReleaseAcquiredRecordsSubset()
{
+ public void
testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcquiredRecordsSubset()
{
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxDeliveryCount(2)
.withState(SharePartitionState.ACTIVE)
@@ -3999,21 +4000,21 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(15L).batchState());
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
- assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(20L).batchState());
assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(20L).batchMemberId());
assertNull(sharePartition.cachedState().get(20L).offsetState());
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
- expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
expectedOffsetStateMap.clear();
- expectedOffsetStateMap.put(15L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(15L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
@@ -4050,9 +4051,10 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records2, 2);
fetchAcquiredRecords(sharePartition, records3, 5);
- CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
- assertNull(releaseResult.join());
- assertFalse(releaseResult.isCompletedExceptionally());
+ sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of(
+ new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)),
+ new ShareAcknowledgementBatch(20, 24, List.of((byte) 2))
+ )));
assertEquals(25, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.cachedState().size());
@@ -4172,7 +4174,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
- assertEquals(1,
sharePartition.cachedState().get(5L).batchDeliveryCount());
+ assertEquals(0,
sharePartition.cachedState().get(5L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(5L).offsetState());
// Acquisition lock timer task would be cancelled by the release
acquired records operation.
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
@@ -4215,7 +4217,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
- expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
@@ -4229,8 +4231,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
// Acquisition lock timer task would be cancelled by the release
acquired records operation.
@@ -4816,7 +4818,7 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(21L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(22L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(23L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(24L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(24L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(20L).offsetState());
@@ -4830,8 +4832,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(35L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(36L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(37L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(38L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(39L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(38L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(39L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(35L).offsetState());
}
@@ -4887,14 +4889,60 @@ public class SharePartitionTest {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
}
+ @Test
+ public void testReleaseAcquiredRecordsDecreaseDeliveryCount() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
+
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(12, 13, List.of((byte) 1))));
+
+ // LSO is at 11.
+ sharePartition.updateCacheAndOffsets(11);
+
+ assertEquals(15, sharePartition.nextFetchOffset());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(14, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+
+ // Before release, the delivery count was incremented.
+ Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+
+ // Release acquired records.
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
+ assertFalse(releaseResult.isCompletedExceptionally());
+
+ // Check delivery count.
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).batchMemberId());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(1,
sharePartition.cachedState().get(5L).batchDeliveryCount());
+
+ // After release, the delivery count was decremented.
+ expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+ }
+
@Test
public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement()
throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder()
@@ -6668,7 +6716,7 @@ public class SharePartitionTest {
});
});
}
-
+
private String assertionFailedMessage(SharePartition sharePartition,
Map<Long, List<Long>> offsets) {
StringBuilder errorMessage = new
StringBuilder(ACQUISITION_LOCK_NEVER_GOT_RELEASED + String.format(
" timer size: %d, next fetch offset: %d\n",