This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new cecacea22b1 KAFKA-19972: Bump delivery count on session release 
(#21092)
cecacea22b1 is described below

commit cecacea22b14aa1f50ae8846c4552126f4f43687
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Dec 5 21:51:47 2025 +0000

    KAFKA-19972: Bump delivery count on session release (#21092)
    
    If a client application crashes then it can go in infinte loop where
    same records will be delivered. Though previously we chose to decrease
    the delivery count on session release as we didn't have throttling
    support. Now when we do then it makes sense to bump the delivery count
    on session close. Also as share-groups clients should ideally not have
    pre-fetched data hence it's safe to bump the delivery count on session
    release.
    
    I have not removed the code to decrease the delivery count as that
    functionality is well tested and we might need at the time of
    pre-fetching support or in cases where we do need not to bump the
    delivery count, in future.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 34 ++-------
 .../java/kafka/server/share/SharePartition.java    |  4 +-
 .../kafka/server/share/SharePartitionTest.java     | 85 +++++++++++-----------
 3 files changed, 49 insertions(+), 74 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index d40f223ece2..85fa049636e 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2218,29 +2218,6 @@ public class ShareConsumerTest {
         verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @ClusterTest
-    public void testDeliveryCountNotIncreaseAfterSessionClose() {
-        alterShareAutoOffsetReset("group1", "earliest");
-        try (Producer<byte[], byte[]> producer = createProducer()) {
-            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
-            // We write 10 records to the topic, so they would be written from 
offsets 0-9 on the topic.
-            for (int i = 0; i < 10; i++) {
-                assertDoesNotThrow(() -> producer.send(record).get(), "Failed 
to send records");
-            }
-        }
-
-        // Perform the fetch, close in a loop.
-        for (int count = 0; count < 
ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT; count++) {
-            consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, false);
-        }
-
-        // If the delivery count is increased, consumer will get nothing.
-        int consumedMessageCount = consumeMessages(new AtomicInteger(0), 10, 
"group1", 1, 10, true);
-        // The records returned belong to offsets 0-9.
-        assertEquals(10, consumedMessageCount);
-        verifyShareGroupStateTopicRecordsProduced();
-    }
-
     @ClusterTest
     public void 
testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAcknowledgement()
 {
         alterShareAutoOffsetReset("group1", "earliest");
@@ -2270,13 +2247,13 @@ public class ShareConsumerTest {
             ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 2);
             assertEquals(2, records.count());
             assertEquals((short) 2, 
records.records(tp).get(0).deliveryCount().get());
-            assertEquals((short) 1, 
records.records(tp).get(1).deliveryCount().get());
+            assertEquals((short) 2, 
records.records(tp).get(1).deliveryCount().get());
         }
     }
 
     @ClusterTest(
         serverProperties = {
-            @ClusterConfigProperty(key = "group.share.delivery.count.limit", 
value = "2"),
+            @ClusterConfigProperty(key = "group.share.delivery.count.limit", 
value = "3"),
         }
     )
     public void testBehaviorOnDeliveryCountBoundary() {
@@ -2304,7 +2281,6 @@ public class ShareConsumerTest {
             records = waitedPoll(shareConsumer, 2500L, 1);
             assertEquals(1, records.count());
             assertEquals((short) 2, 
records.records(tp).get(0).deliveryCount().get());
-
         }
 
         // Start again and same record should be delivered
@@ -2312,7 +2288,7 @@ public class ShareConsumerTest {
             shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
             assertEquals(1, records.count());
-            assertEquals((short) 2, 
records.records(tp).get(0).deliveryCount().get());
+            assertEquals((short) 3, 
records.records(tp).get(0).deliveryCount().get());
         }
     }
 
@@ -2369,9 +2345,9 @@ public class ShareConsumerTest {
         // Let the complex consumer read the messages.
         service.schedule(() -> prodState.done().set(true), 5L, 
TimeUnit.SECONDS);
 
-        // All messages which can be read are read, some would be redelivered 
(roughly 3 times the records produced).
+        // All messages which can be read are read, some would be redelivered 
(roughly 2 times the records produced).
         TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did 
not close!");
-        int delta = complexCons1.recordsRead() - (int) 
(prodState.count().get() * 3 * 0.95);    // 3 times with margin of error (5%).
+        int delta = complexCons1.recordsRead() - (int) 
(prodState.count().get() * 2 * 0.95);    // 2 times with margin of error (5%).
 
         assertTrue(delta > 0,
             String.format("Producer (%d) and share consumer (%d) record count 
mismatch.", prodState.count().get(), complexCons1.recordsRead()));
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 1043adb71fc..243a7406d6a 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1116,7 +1116,7 @@ public class SharePartition {
                 // These records were fetched but they were not actually 
delivered to the client.
                 InFlightState updateResult = 
offsetState.getValue().startStateTransition(
                         offsetState.getKey() < startOffset ? 
RecordState.ARCHIVED : recordState,
-                        DeliveryCountOps.DECREASE,
+                        DeliveryCountOps.NO_OP,
                         this.maxDeliveryCount,
                         EMPTY_MEMBER_ID
                 );
@@ -1158,7 +1158,7 @@ public class SharePartition {
         if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
             InFlightState updateResult = 
inFlightBatch.startBatchStateTransition(
                     inFlightBatch.lastOffset() < startOffset ? 
RecordState.ARCHIVED : recordState,
-                    DeliveryCountOps.DECREASE,
+                    DeliveryCountOps.NO_OP,
                     this.maxDeliveryCount,
                     EMPTY_MEMBER_ID
             );
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index beae9e48d8d..f210ab22ef9 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -5034,7 +5034,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).batchState());
         // Release delivery count.
-        assertEquals(0, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+        assertEquals(1, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(0L).offsetState());
         assertEquals(0, sharePartition.deliveryCompleteCount());
     }
@@ -5053,7 +5053,7 @@ public class SharePartitionTest {
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
-        assertEquals(0, 
sharePartition.cachedState().get(5L).batchDeliveryCount());
+        assertEquals(1, 
sharePartition.cachedState().get(5L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(5L).offsetState());
         assertEquals(0, sharePartition.deliveryCompleteCount());
     }
@@ -5126,7 +5126,7 @@ public class SharePartitionTest {
         assertEquals(5, sharePartition.nextFetchOffset());
         // Check cached state.
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
-        expectedOffsetStateMap.put(5L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(5L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(6L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
 
@@ -5140,8 +5140,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
         assertEquals(10, sharePartition.deliveryCompleteCount());
     }
@@ -5192,8 +5192,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
 
@@ -5216,8 +5216,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
     }
@@ -5268,8 +5268,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
 
@@ -5289,7 +5289,7 @@ public class SharePartitionTest {
         // Check cached state.
         expectedOffsetStateMap.clear();
         expectedOffsetStateMap.put(5L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(6L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(6L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
         expectedOffsetStateMap.clear();
         expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
@@ -5301,8 +5301,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
         assertEquals(9, sharePartition.deliveryCompleteCount());
     }
@@ -5339,7 +5339,7 @@ public class SharePartitionTest {
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(5L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(6L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(7L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(7L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(8L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(9L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
@@ -5370,9 +5370,9 @@ public class SharePartitionTest {
 
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(2, sharePartition.cachedState().size());
-        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(10L).batchState());
         assertNull(sharePartition.cachedState().get(10L).offsetState());
-        assertEquals(0, sharePartition.deliveryCompleteCount());
+        assertEquals(5, sharePartition.deliveryCompleteCount());
     }
 
     @Test
@@ -5417,26 +5417,25 @@ public class SharePartitionTest {
         assertNotNull(sharePartition.cachedState().get(10L).offsetState());
         assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(15L).batchState());
         assertNotNull(sharePartition.cachedState().get(10L).offsetState());
-        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(20L).batchState());
-        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(20L).batchMemberId());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
         assertNull(sharePartition.cachedState().get(20L).offsetState());
 
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
-        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
 
         expectedOffsetStateMap.clear();
-        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(15L).offsetState());
-        assertEquals(3, sharePartition.deliveryCompleteCount());
+        assertEquals(12, sharePartition.deliveryCompleteCount());
     }
 
     @Test
@@ -5604,7 +5603,7 @@ public class SharePartitionTest {
         assertEquals(5, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
-        assertEquals(0, 
sharePartition.cachedState().get(5L).batchDeliveryCount());
+        assertEquals(1, 
sharePartition.cachedState().get(5L).batchDeliveryCount());
         assertNull(sharePartition.cachedState().get(5L).offsetState());
         // Acquisition lock timer task would be cancelled by the release 
acquired records operation.
         
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
@@ -5649,7 +5648,7 @@ public class SharePartitionTest {
         assertEquals(5, sharePartition.nextFetchOffset());
         // Check cached state.
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
-        expectedOffsetStateMap.put(5L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(5L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(6L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
 
@@ -5663,8 +5662,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
 
         // Acquisition lock timer task would be cancelled by the release 
acquired records operation.
@@ -6853,7 +6852,7 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(21L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(22L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(23L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(24L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(24L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
 
         assertEquals(0, sharePartition.deliveryCompleteCount());
 
@@ -6869,8 +6868,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(35L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(36L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(37L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(38L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(39L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(38L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(39L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(35L).offsetState());
     }
 
@@ -6932,10 +6931,10 @@ public class SharePartitionTest {
 
         Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
 
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
 
@@ -6944,7 +6943,7 @@ public class SharePartitionTest {
     }
 
     @Test
-    public void testReleaseAcquiredRecordsDecreaseDeliveryCount() {
+    public void testReleaseAcquiredRecordsDoNotDecreaseDeliveryCount() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
@@ -6986,10 +6985,10 @@ public class SharePartitionTest {
         // After release, the delivery count was decremented.
         expectedOffsetStateMap = new HashMap<>();
         expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
         assertEquals(2, sharePartition.deliveryCompleteCount());
     }
@@ -11252,8 +11251,8 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID));

Reply via email to