This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71299c229a2 KAFKA-19972: Bump delivery count on session release
(#21092)
71299c229a2 is described below
commit 71299c229a2ea008a14b4fc98ef4ce538c68f03c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Dec 5 21:51:47 2025 +0000
KAFKA-19972: Bump delivery count on session release (#21092)
If a client application crashes then it can go in infinte loop where
same records will be delivered. Though previously we chose to decrease
the delivery count on session release as we didn't have throttling
support. Now when we do then it makes sense to bump the delivery count
on session close. Also as share-groups clients should ideally not have
pre-fetched data hence it's safe to bump the delivery count on session
release.
I have not removed the code to decrease the delivery count as that
functionality is well tested and we might need at the time of
pre-fetching support or in cases where we do need not to bump the
delivery count, in future.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 34 ++-------
.../java/kafka/server/share/SharePartition.java | 4 +-
.../kafka/server/share/SharePartitionTest.java | 85 +++++++++++-----------
3 files changed, 49 insertions(+), 74 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index d40f223ece2..85fa049636e 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2218,29 +2218,6 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
- @ClusterTest
- public void testDeliveryCountNotIncreaseAfterSessionClose() {
- alterShareAutoOffsetReset("group1", "earliest");
- try (Producer<byte[], byte[]> producer = createProducer()) {
- ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
- // We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
- for (int i = 0; i < 10; i++) {
- assertDoesNotThrow(() -> producer.send(record).get(), "Failed
to send records");
- }
- }
-
- // Perform the fetch, close in a loop.
- for (int count = 0; count <
ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT; count++) {
- consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, false);
- }
-
- // If the delivery count is increased, consumer will get nothing.
- int consumedMessageCount = consumeMessages(new AtomicInteger(0), 10,
"group1", 1, 10, true);
- // The records returned belong to offsets 0-9.
- assertEquals(10, consumedMessageCount);
- verifyShareGroupStateTopicRecordsProduced();
- }
-
@ClusterTest
public void
testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAcknowledgement()
{
alterShareAutoOffsetReset("group1", "earliest");
@@ -2270,13 +2247,13 @@ public class ShareConsumerTest {
ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 2);
assertEquals(2, records.count());
assertEquals((short) 2,
records.records(tp).get(0).deliveryCount().get());
- assertEquals((short) 1,
records.records(tp).get(1).deliveryCount().get());
+ assertEquals((short) 2,
records.records(tp).get(1).deliveryCount().get());
}
}
@ClusterTest(
serverProperties = {
- @ClusterConfigProperty(key = "group.share.delivery.count.limit",
value = "2"),
+ @ClusterConfigProperty(key = "group.share.delivery.count.limit",
value = "3"),
}
)
public void testBehaviorOnDeliveryCountBoundary() {
@@ -2304,7 +2281,6 @@ public class ShareConsumerTest {
records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
assertEquals((short) 2,
records.records(tp).get(0).deliveryCount().get());
-
}
// Start again and same record should be delivered
@@ -2312,7 +2288,7 @@ public class ShareConsumerTest {
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
- assertEquals((short) 2,
records.records(tp).get(0).deliveryCount().get());
+ assertEquals((short) 3,
records.records(tp).get(0).deliveryCount().get());
}
}
@@ -2369,9 +2345,9 @@ public class ShareConsumerTest {
// Let the complex consumer read the messages.
service.schedule(() -> prodState.done().set(true), 5L,
TimeUnit.SECONDS);
- // All messages which can be read are read, some would be redelivered
(roughly 3 times the records produced).
+ // All messages which can be read are read, some would be redelivered
(roughly 2 times the records produced).
TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did
not close!");
- int delta = complexCons1.recordsRead() - (int)
(prodState.count().get() * 3 * 0.95); // 3 times with margin of error (5%).
+ int delta = complexCons1.recordsRead() - (int)
(prodState.count().get() * 2 * 0.95); // 2 times with margin of error (5%).
assertTrue(delta > 0,
String.format("Producer (%d) and share consumer (%d) record count
mismatch.", prodState.count().get(), complexCons1.recordsRead()));
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 1043adb71fc..243a7406d6a 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1116,7 +1116,7 @@ public class SharePartition {
// These records were fetched but they were not actually
delivered to the client.
InFlightState updateResult =
offsetState.getValue().startStateTransition(
offsetState.getKey() < startOffset ?
RecordState.ARCHIVED : recordState,
- DeliveryCountOps.DECREASE,
+ DeliveryCountOps.NO_OP,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
@@ -1158,7 +1158,7 @@ public class SharePartition {
if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
InFlightState updateResult =
inFlightBatch.startBatchStateTransition(
inFlightBatch.lastOffset() < startOffset ?
RecordState.ARCHIVED : recordState,
- DeliveryCountOps.DECREASE,
+ DeliveryCountOps.NO_OP,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index beae9e48d8d..f210ab22ef9 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -5034,7 +5034,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).batchState());
// Release delivery count.
- assertEquals(0,
sharePartition.cachedState().get(0L).batchDeliveryCount());
+ assertEquals(1,
sharePartition.cachedState().get(0L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(0L).offsetState());
assertEquals(0, sharePartition.deliveryCompleteCount());
}
@@ -5053,7 +5053,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
- assertEquals(0,
sharePartition.cachedState().get(5L).batchDeliveryCount());
+ assertEquals(1,
sharePartition.cachedState().get(5L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(5L).offsetState());
assertEquals(0, sharePartition.deliveryCompleteCount());
}
@@ -5126,7 +5126,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
- expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
@@ -5140,8 +5140,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
assertEquals(10, sharePartition.deliveryCompleteCount());
}
@@ -5192,8 +5192,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
assertEquals(9, sharePartition.deliveryCompleteCount());
@@ -5216,8 +5216,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
assertEquals(9, sharePartition.deliveryCompleteCount());
}
@@ -5268,8 +5268,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
assertEquals(9, sharePartition.deliveryCompleteCount());
@@ -5289,7 +5289,7 @@ public class SharePartitionTest {
// Check cached state.
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
expectedOffsetStateMap.clear();
expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
@@ -5301,8 +5301,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
assertEquals(9, sharePartition.deliveryCompleteCount());
}
@@ -5339,7 +5339,7 @@ public class SharePartitionTest {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(7L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(7L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(8L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(9L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
@@ -5370,9 +5370,9 @@ public class SharePartitionTest {
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(2, sharePartition.cachedState().size());
- assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(10L).batchState());
assertNull(sharePartition.cachedState().get(10L).offsetState());
- assertEquals(0, sharePartition.deliveryCompleteCount());
+ assertEquals(5, sharePartition.deliveryCompleteCount());
}
@Test
@@ -5417,26 +5417,25 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(15L).batchState());
assertNotNull(sharePartition.cachedState().get(10L).offsetState());
- assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(20L).batchState());
- assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(20L).batchMemberId());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
assertNull(sharePartition.cachedState().get(20L).offsetState());
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
- expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
expectedOffsetStateMap.clear();
- expectedOffsetStateMap.put(15L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(15L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(15L).offsetState());
- assertEquals(3, sharePartition.deliveryCompleteCount());
+ assertEquals(12, sharePartition.deliveryCompleteCount());
}
@Test
@@ -5604,7 +5603,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
- assertEquals(0,
sharePartition.cachedState().get(5L).batchDeliveryCount());
+ assertEquals(1,
sharePartition.cachedState().get(5L).batchDeliveryCount());
assertNull(sharePartition.cachedState().get(5L).offsetState());
// Acquisition lock timer task would be cancelled by the release
acquired records operation.
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
@@ -5649,7 +5648,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
// Check cached state.
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
- expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(5L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(6L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
@@ -5663,8 +5662,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
// Acquisition lock timer task would be cancelled by the release
acquired records operation.
@@ -6853,7 +6852,7 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(21L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(22L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(23L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(24L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(24L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(0, sharePartition.deliveryCompleteCount());
@@ -6869,8 +6868,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(35L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(36L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(37L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(38L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(39L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(38L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(39L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(35L).offsetState());
}
@@ -6932,10 +6931,10 @@ public class SharePartitionTest {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
@@ -6944,7 +6943,7 @@ public class SharePartitionTest {
}
@Test
- public void testReleaseAcquiredRecordsDecreaseDeliveryCount() {
+ public void testReleaseAcquiredRecordsDoNotDecreaseDeliveryCount() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
@@ -6986,10 +6985,10 @@ public class SharePartitionTest {
// After release, the delivery count was decremented.
expectedOffsetStateMap = new HashMap<>();
expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
assertEquals(2, sharePartition.deliveryCompleteCount());
}
@@ -11252,8 +11251,8 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(15L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));