This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d78b4ec65d1 MINOR: Use explicit AcknowledgeType enums in
SharePartitionTest (#20802)
d78b4ec65d1 is described below
commit d78b4ec65d10495ee8dd6d08eabf7190066cab2e
Author: jimmy <[email protected]>
AuthorDate: Sun Nov 2 19:08:11 2025 +0800
MINOR: Use explicit AcknowledgeType enums in SharePartitionTest (#20802)
This PR updates the `SharePartitionTest.java` unit tests to use
`AcknowledgeType` enums instead of hardcoded byte values. This makes the
tests more readable and maintainable, and ensures that acknowledgement
types are referenced consistently throughout the codebase.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/server/share/SharePartitionTest.java | 340 +++++++++++----------
1 file changed, 171 insertions(+), 169 deletions(-)
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 36a2d0a1ac5..eeb8ce3002e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -2317,7 +2317,7 @@ public class SharePartitionTest {
// Release middle batch.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(15, 19, List.of((byte) 2))));
+ List.of(new ShareAcknowledgementBatch(15, 19,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
// Validate the nextFetchOffset is updated to 15.
@@ -2342,7 +2342,7 @@ public class SharePartitionTest {
// Release last offset of the acquired batch. Only 1 record should be
released and later acquired.
ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(29, 29, List.of((byte) 2))));
+ List.of(new ShareAcknowledgementBatch(29, 29,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
// Validate the nextFetchOffset is updated to 29.
@@ -2390,7 +2390,7 @@ public class SharePartitionTest {
// Release middle batch.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 2))));
+ List.of(new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
// Validate the nextFetchOffset is updated to 5.
@@ -2442,7 +2442,7 @@ public class SharePartitionTest {
// Release only 1 middle batch.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2))));
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
// Validate the nextFetchOffset is updated to 5.
@@ -2637,7 +2637,7 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(1, 1, List.of((byte) 1))));
+ List.of(new ShareAcknowledgementBatch(1, 1,
List.of(AcknowledgeType.ACCEPT.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -2666,7 +2666,7 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 1))));
+ List.of(new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.ACCEPT.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -2701,11 +2701,11 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(
- new ShareAcknowledgementBatch(5, 6, List.of((byte) 2)),
+ new ShareAcknowledgementBatch(5, 6,
List.of(AcknowledgeType.RELEASE.id)),
new ShareAcknowledgementBatch(10, 18, List.of(
- (byte) 2, (byte) 2, (byte) 2,
- (byte) 2, (byte) 2, (byte) 0,
- (byte) 0, (byte) 0, (byte) 1
+ AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id,
AcknowledgeType.RELEASE.id,
+ AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id,
ACKNOWLEDGE_TYPE_GAP_ID,
+ ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.ACCEPT.id
))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -2763,11 +2763,11 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(new ShareAcknowledgementBatch(6, 18, List.of(
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 0, (byte) 0, (byte) 1,
- (byte) 0, (byte) 1, (byte) 0,
- (byte) 1))));
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id,
ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -2806,7 +2806,7 @@ public class SharePartitionTest {
// Acknowledge a batch when cache is empty.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(0, 15, List.of((byte) 3))));
+ List.of(new ShareAcknowledgementBatch(0, 15,
List.of(AcknowledgeType.REJECT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
@@ -2820,7 +2820,7 @@ public class SharePartitionTest {
ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(20, 25, List.of((byte) 3))));
+ List.of(new ShareAcknowledgementBatch(20, 25,
List.of(AcknowledgeType.REJECT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRequestException.class, ackResult);
assertEquals(0, sharePartition.inFlightTerminalRecords());
@@ -2844,8 +2844,8 @@ public class SharePartitionTest {
// Acknowledge a batch when first batch violates the range.
List<ShareAcknowledgementBatch> acknowledgeBatches = List.of(
- new ShareAcknowledgementBatch(0, 10, List.of((byte) 1)),
- new ShareAcknowledgementBatch(20, 24, List.of((byte) 1)));
+ new ShareAcknowledgementBatch(0, 10,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(20, 24,
List.of(AcknowledgeType.ACCEPT.id)));
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID, acknowledgeBatches);
assertTrue(ackResult.isCompletedExceptionally());
@@ -2881,7 +2881,7 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
"member-2",
- List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 3))));
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.REJECT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
}
@@ -2899,7 +2899,7 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2))));
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -2909,7 +2909,7 @@ public class SharePartitionTest {
// Acknowledge the same batch again but with ACCEPT type.
ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 1))));
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
@@ -2921,7 +2921,7 @@ public class SharePartitionTest {
ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(6, 8, List.of((byte) 3))));
+ List.of(new ShareAcknowledgementBatch(6, 8,
List.of(AcknowledgeType.REJECT.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
assertEquals(3, sharePartition.inFlightTerminalRecords());
@@ -2929,7 +2929,7 @@ public class SharePartitionTest {
// Re-acknowledge the subset batch with REJECT type.
ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(6, 8, List.of((byte) 3))));
+ List.of(new ShareAcknowledgementBatch(6, 8,
List.of(AcknowledgeType.REJECT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
assertEquals(3, sharePartition.inFlightTerminalRecords());
@@ -2958,11 +2958,11 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(
- new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)),
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)),
- new ShareAcknowledgementBatch(15, 19, List.of((byte) 1)),
+ new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(15, 19,
List.of(AcknowledgeType.ACCEPT.id)),
// Add another batch which should fail the request.
- new ShareAcknowledgementBatch(15, 19, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(15, 19,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
@@ -2997,11 +2997,11 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
List.of(
- new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)),
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)),
- new ShareAcknowledgementBatch(15, 19, List.of((byte) 1)),
+ new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(15, 19,
List.of(AcknowledgeType.ACCEPT.id)),
// Add another batch which should fail the request.
- new ShareAcknowledgementBatch(16, 19, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(16, 19,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
@@ -3027,7 +3027,7 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(12, 13, List.of((byte) 2))));
+ List.of(new ShareAcknowledgementBatch(12, 13,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -3095,7 +3095,7 @@ public class SharePartitionTest {
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(12, 30, List.of((byte) 2))));
+ List.of(new ShareAcknowledgementBatch(12, 30,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -4067,7 +4067,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(0, 0, List.of((byte) 2))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RELEASE.id))));
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
assertEquals(0, sharePartition.timer().size());
@@ -4101,7 +4101,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(5, 14, List.of((byte) 2))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.RELEASE.id))));
assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
@@ -4154,7 +4154,7 @@ public class SharePartitionTest {
sharePartition.acknowledge(MEMBER_ID,
// Do not send gap offsets to verify that they are ignored and
accepted as per client ack.
- List.of(new ShareAcknowledgementBatch(5, 18, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(5, 18,
List.of(AcknowledgeType.ACCEPT.id))));
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
@@ -4282,11 +4282,11 @@ public class SharePartitionTest {
// Acknowledging over subset of both batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(
6, 18, List.of(
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 0, (byte) 0, (byte) 1,
- (byte) 0, (byte) 1, (byte) 0,
- (byte) 1))));
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id,
ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id))));
assertNotNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
@@ -4551,7 +4551,7 @@ public class SharePartitionTest {
// Acknowledge with ACCEPT type should throw
InvalidRecordStateException since they've been released due to acquisition lock
timeout.
CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
@@ -4560,7 +4560,7 @@ public class SharePartitionTest {
// Try acknowledging with REJECT type should throw
InvalidRecordStateException since they've been released due to acquisition lock
timeout.
ackResult = sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte)
3))));
+ List.of(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.REJECT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
@@ -4580,8 +4580,8 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size());
- // Acknowledge with REJECT type.
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(5, 6, List.of((byte) 2))));
+ // Acknowledge with RELEASE type.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(5, 6, List.of(AcknowledgeType.RELEASE.id))));
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
@@ -4592,7 +4592,7 @@ public class SharePartitionTest {
assertEquals(0, sharePartition.inFlightTerminalRecords());
// Acknowledge with ACCEPT type.
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(8, 9, List.of((byte) 1))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id))));
assertNull(sharePartition.cachedState().get(5L).offsetState().get(5L).acquisitionLockTimeoutTask());
assertNull(sharePartition.cachedState().get(5L).offsetState().get(6L).acquisitionLockTimeoutTask());
@@ -4747,7 +4747,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(8, 9, List.of((byte) 1))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id))));
assertEquals(2, sharePartition.inFlightTerminalRecords());
@@ -4835,7 +4835,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(8, 9, List.of((byte) 1))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(8, 9, List.of(AcknowledgeType.ACCEPT.id))));
// Mock persister writeState method so that
sharePartition.isWriteShareGroupStateSuccessful() returns false.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
@@ -4919,7 +4919,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records1, 2);
fetchAcquiredRecords(sharePartition, records2, 9);
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(5, 18, List.of((byte) 1))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(5, 18, List.of(AcknowledgeType.ACCEPT.id))));
// After the acknowledgements, the cached state has 11 Terminal
records ->
// (5 -> 6)
// (10 -> 18)
@@ -4957,11 +4957,11 @@ public class SharePartitionTest {
// Acknowledging over subset of both batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(6, 18, List.of(
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 0, (byte) 0, (byte) 1,
- (byte) 0, (byte) 1, (byte) 0,
- (byte) 1))));
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id,
ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id))));
// After the acknowledgements, the cached state has 10 Terminal
records ->
// 6
@@ -5014,9 +5014,9 @@ public class SharePartitionTest {
// Acknowledging over subset of second batch with subset of gap
offsets.
sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(10, 18, List.of(
- (byte) 1, (byte) 1, (byte) 0, (byte) 0,
- (byte) 1, (byte) 0, (byte) 1, (byte) 0,
- (byte) 1))));
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id))));
// After the acknowledgements, the cached state has 9 Terminal records
->
// (10 -> 18)
@@ -5090,9 +5090,9 @@ public class SharePartitionTest {
// Acknowledging over subset of second batch with subset of gap
offsets.
sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(10, 18, List.of(
- (byte) 1, (byte) 1, (byte) 0, (byte) 0,
- (byte) 1, (byte) 0, (byte) 1, (byte) 0,
- (byte) 1))));
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.ACCEPT.id, ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id))));
// After the acknowledgements, the cached state has 9 Terminal records
->
// (10 -> 18)
@@ -5124,7 +5124,7 @@ public class SharePartitionTest {
// Ack subset of records by "member-2".
sharePartition.acknowledge("member-2",
- List.of(new ShareAcknowledgementBatch(5, 5, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(5, 5,
List.of(AcknowledgeType.ACCEPT.id))));
// After the acknowledgements, the startOffset will be upadated to 6,
since offset 5 is Terminal. Hence
// inFlightTerminalRecords will remain 9.
assertEquals(9, sharePartition.inFlightTerminalRecords());
@@ -5173,10 +5173,10 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 6, List.of((byte)
2))));
+ List.of(new ShareAcknowledgementBatch(5, 6,
List.of(AcknowledgeType.RELEASE.id))));
sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(8, 9,
List.of(AcknowledgeType.ACCEPT.id))));
assertEquals(2, sharePartition.inFlightTerminalRecords());
@@ -5208,7 +5208,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records2, 5);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(0, sharePartition.inFlightTerminalRecords());
fetchAcquiredRecords(sharePartition, records2, 5);
@@ -5242,9 +5242,9 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records3, 5);
sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of(
- new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)),
- new ShareAcknowledgementBatch(17, 19, List.of((byte) 3)),
- new ShareAcknowledgementBatch(20, 24, List.of((byte) 2))
+ new ShareAcknowledgementBatch(13, 16,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(17, 19,
List.of(AcknowledgeType.REJECT.id)),
+ new ShareAcknowledgementBatch(20, 24,
List.of(AcknowledgeType.RELEASE.id))
)));
assertEquals(3, sharePartition.inFlightTerminalRecords());
@@ -5306,10 +5306,10 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records3, 5);
sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of(
- new ShareAcknowledgementBatch(10, 12, List.of((byte) 1)),
- new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)),
- new ShareAcknowledgementBatch(17, 19, List.of((byte) 3)),
- new ShareAcknowledgementBatch(20, 24, List.of((byte) 2))
+ new ShareAcknowledgementBatch(10, 12,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(13, 16,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(17, 19,
List.of(AcknowledgeType.REJECT.id)),
+ new ShareAcknowledgementBatch(20, 24,
List.of(AcknowledgeType.RELEASE.id))
)));
// After acknowledgements, since offsets 10 -> 12 are at the start of
the caches state and are in Terminal state,
@@ -5323,8 +5323,8 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records3, 5);
sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of(
- new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)),
- new ShareAcknowledgementBatch(20, 24, List.of((byte) 2))
+ new ShareAcknowledgementBatch(13, 16,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(20, 24,
List.of(AcknowledgeType.RELEASE.id))
)));
assertEquals(25, sharePartition.nextFetchOffset());
@@ -5339,7 +5339,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 7);
sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 7, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(5, 7,
List.of(AcknowledgeType.ACCEPT.id))));
assertEquals(0, sharePartition.inFlightTerminalRecords());
@@ -5408,7 +5408,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 6), 6);
sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(8, 9,
List.of(AcknowledgeType.ACCEPT.id))));
assertEquals(2, sharePartition.inFlightTerminalRecords());
@@ -5483,11 +5483,11 @@ public class SharePartitionTest {
// Acknowledging over subset of both batch with subset of gap offsets.
sharePartition.acknowledge(MEMBER_ID,
List.of(new ShareAcknowledgementBatch(6, 18, List.of(
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 1, (byte) 1, (byte) 1,
- (byte) 0, (byte) 0, (byte) 1,
- (byte) 0, (byte) 1, (byte) 0,
- (byte) 1))));
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.ACCEPT.id,
+ ACKNOWLEDGE_TYPE_GAP_ID, AcknowledgeType.ACCEPT.id,
ACKNOWLEDGE_TYPE_GAP_ID,
+ AcknowledgeType.ACCEPT.id))));
assertEquals(10, sharePartition.inFlightTerminalRecords());
@@ -5659,10 +5659,10 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(32, 5), 5);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(2, 6, List.of((byte) 1)),
- new ShareAcknowledgementBatch(12, 16, List.of((byte) 2)),
- new ShareAcknowledgementBatch(22, 26, List.of((byte) 2)),
- new ShareAcknowledgementBatch(27, 31, List.of((byte) 3))
+ new ShareAcknowledgementBatch(2, 6,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(12, 16,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(22, 26,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(27, 31,
List.of(AcknowledgeType.REJECT.id))
));
// After the acknowledgements, the records in Terminal state are ->
@@ -5722,10 +5722,10 @@ public class SharePartitionTest {
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(2, 6, List.of((byte) 2)),
- new ShareAcknowledgementBatch(12, 16, List.of((byte) 3)),
- new ShareAcknowledgementBatch(22, 26, List.of((byte) 2)),
- new ShareAcknowledgementBatch(27, 31, List.of((byte) 3))
+ new ShareAcknowledgementBatch(2, 6,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(12, 16,
List.of(AcknowledgeType.REJECT.id)),
+ new ShareAcknowledgementBatch(22, 26,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(27, 31,
List.of(AcknowledgeType.REJECT.id))
));
// After the acknowledgements, the records in Terminal state are ->
@@ -5788,11 +5788,11 @@ public class SharePartitionTest {
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(2, 6, List.of((byte) 2)),
- new ShareAcknowledgementBatch(12, 16, List.of((byte) 3)),
- new ShareAcknowledgementBatch(19, 21, List.of((byte) 3)),
- new ShareAcknowledgementBatch(22, 26, List.of((byte) 2)),
- new ShareAcknowledgementBatch(27, 31, List.of((byte) 3))
+ new ShareAcknowledgementBatch(2, 6,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(12, 16,
List.of(AcknowledgeType.REJECT.id)),
+ new ShareAcknowledgementBatch(19, 21,
List.of(AcknowledgeType.REJECT.id)),
+ new ShareAcknowledgementBatch(22, 26,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(27, 31,
List.of(AcknowledgeType.REJECT.id))
));
// After the acknowledgements, the records in Terminal state are ->
@@ -5860,8 +5860,8 @@ public class SharePartitionTest {
// 3. 31 -> 40: AVAILABLE
// 4. 41 -> 50: ACQUIRED
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)),
- new ShareAcknowledgementBatch(31, 40, List.of((byte) 2))
+ new ShareAcknowledgementBatch(11, 20,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(31, 40,
List.of(AcknowledgeType.RELEASE.id))
));
assertEquals(0, sharePartition.inFlightTerminalRecords());
@@ -5890,7 +5890,7 @@ public class SharePartitionTest {
// The client acknowledges the batch 21 -> 30. Since this batch is
before the LSO, nothing will be done and these
// records will remain in the ACQUIRED state.
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21L, 30L, List.of(AcknowledgeType.RELEASE.id))));
// The acknowledgements make no difference to in flight records.
assertEquals(0, sharePartition.inFlightTerminalRecords());
@@ -5921,8 +5921,8 @@ public class SharePartitionTest {
// 3. 31 -> 40: AVAILABLE
// 4. 41 -> 50: ACQUIRED
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)),
- new ShareAcknowledgementBatch(31, 40, List.of((byte) 2))
+ new ShareAcknowledgementBatch(11, 20,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(31, 40,
List.of(AcknowledgeType.RELEASE.id))
));
assertEquals(0, sharePartition.inFlightTerminalRecords());
@@ -5960,7 +5960,7 @@ public class SharePartitionTest {
// The client acknowledges the batch 21 -> 30. Since this batch is
before the LSO, nothing will be done and these
// records will remain in the ACQUIRED state.
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21L, 30L, List.of(AcknowledgeType.RELEASE.id))));
assertEquals(0, sharePartition.inFlightTerminalRecords());
// The batch is still in ACQUIRED state.
@@ -5982,7 +5982,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(4, 8, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(4, 8,
List.of(AcknowledgeType.ACCEPT.id))));
// LSO at is 5.
sharePartition.updateCacheAndOffsets(5);
@@ -6117,7 +6117,7 @@ public class SharePartitionTest {
// Acknowledge with ACCEPT action.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(7, 8, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(7, 8,
List.of(AcknowledgeType.ACCEPT.id))));
// LSO is at 7.
sharePartition.updateCacheAndOffsets(7);
@@ -6163,7 +6163,7 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(7, 8, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(7, 8,
List.of(AcknowledgeType.RELEASE.id))));
// LSO is at 7.
sharePartition.updateCacheAndOffsets(7);
@@ -6195,7 +6195,7 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(7, 8, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(7, 8,
List.of(AcknowledgeType.RELEASE.id))));
// LSO is at 11.
sharePartition.updateCacheAndOffsets(11);
@@ -6227,8 +6227,8 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(7, 8, List.of((byte) 2)),
- new ShareAcknowledgementBatch(11, 11, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(7, 8,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(11, 11,
List.of(AcknowledgeType.RELEASE.id))));
// LSO is at 11.
sharePartition.updateCacheAndOffsets(11);
@@ -6260,7 +6260,7 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(7, 8, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(7, 8,
List.of(AcknowledgeType.RELEASE.id))));
// LSO is at 12.
sharePartition.updateCacheAndOffsets(12);
@@ -6350,7 +6350,7 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.RELEASE.id))));
// LSO is at 10.
sharePartition.updateCacheAndOffsets(10);
@@ -6382,9 +6382,11 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records2, 9);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 6, List.of((byte) 2)),
+ new ShareAcknowledgementBatch(5, 6,
List.of(AcknowledgeType.RELEASE.id)),
new ShareAcknowledgementBatch(10, 18, List.of(
- (byte) 2, (byte) 2, (byte) 2, (byte) 2, (byte) 2,
(byte) 0, (byte) 0, (byte) 0, (byte) 2
+ AcknowledgeType.RELEASE.id,
AcknowledgeType.RELEASE.id, AcknowledgeType.RELEASE.id,
+ AcknowledgeType.RELEASE.id,
AcknowledgeType.RELEASE.id, ACKNOWLEDGE_TYPE_GAP_ID,
+ ACKNOWLEDGE_TYPE_GAP_ID, ACKNOWLEDGE_TYPE_GAP_ID,
AcknowledgeType.RELEASE.id
))));
// LSO is at 18.
@@ -6459,7 +6461,7 @@ public class SharePartitionTest {
// Acknowledge the acquired records. Since these records are present
before the startOffset, these acknowledgements
// will simply be ignored.
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(11L, 20L, List.of((byte) 2))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(11L, 20L, List.of(AcknowledgeType.RELEASE.id))));
// Since the acknowledgements are ignored, the inFlightTerminalRecords
should not change.
assertEquals(0, sharePartition.inFlightTerminalRecords());
@@ -6598,7 +6600,7 @@ public class SharePartitionTest {
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
// Acknowledge the acquired records. Only those records that are after
the startOffset will be acknowledged.
// In this case, records 11 -> 15 will remain in the ACQUIRED state,
while records 16 -> 20 will be RELEASED.
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(11L, 20L, List.of((byte) 2))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(11L, 20L, List.of(AcknowledgeType.RELEASE.id))));
assertEquals(0, sharePartition.inFlightTerminalRecords());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(11L).offsetState().get(11L).state());
@@ -6655,10 +6657,10 @@ public class SharePartitionTest {
// Acknowledge records.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(6, 7, List.of((byte) 1)),
- new ShareAcknowledgementBatch(8, 8, List.of((byte) 2)),
- new ShareAcknowledgementBatch(25, 29, List.of((byte) 2)),
- new ShareAcknowledgementBatch(35, 37, List.of((byte) 2))
+ new ShareAcknowledgementBatch(6, 7,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(8, 8,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(25, 29,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(35, 37,
List.of(AcknowledgeType.RELEASE.id))
));
assertEquals(2, sharePartition.inFlightTerminalRecords());
@@ -6795,7 +6797,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
- sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(12, 13, List.of((byte) 1))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(12, 13, List.of(AcknowledgeType.ACCEPT.id))));
// Records 12 and 13 are ACKNOWLEDGED.
assertEquals(2, sharePartition.inFlightTerminalRecords());
@@ -6858,10 +6860,10 @@ public class SharePartitionTest {
// Acknowledge records.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(6, 7, List.of((byte) 1)),
- new ShareAcknowledgementBatch(8, 8, List.of((byte) 2)),
- new ShareAcknowledgementBatch(25, 29, List.of((byte) 2)),
- new ShareAcknowledgementBatch(35, 37, List.of((byte) 2))
+ new ShareAcknowledgementBatch(6, 7,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(8, 8,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(25, 29,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(35, 37,
List.of(AcknowledgeType.RELEASE.id))
));
assertEquals(2, sharePartition.inFlightTerminalRecords());
@@ -7061,8 +7063,8 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action.
CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(2, 6, List.of((byte) 2)),
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(2, 6,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.RELEASE.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
@@ -7124,7 +7126,7 @@ public class SharePartitionTest {
// Acknowledge with ACCEPT action.
CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(2, 14, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(2, 14,
List.of(AcknowledgeType.ACCEPT.id))));
assertNull(ackResult.join());
assertFalse(ackResult.isCompletedExceptionally());
// Only record 14 is post startOffset and in a Terminal state. Thus,
only that is considered for inFlightTerminalRecords.
@@ -7195,7 +7197,7 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action. This contains a batch that doesn't
exist at all.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(2, 14, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(2, 14,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(10, sharePartition.nextFetchOffset());
assertEquals(10, sharePartition.startOffset());
@@ -7251,7 +7253,7 @@ public class SharePartitionTest {
// Acknowledge with RELEASE action. This contains a batch that doesn't
exist at all.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(1, 7, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(1, 7,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(3, sharePartition.nextFetchOffset());
assertEquals(3, sharePartition.startOffset());
@@ -7502,7 +7504,7 @@ public class SharePartitionTest {
assertFalse(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 249, List.of((byte)
1))));
+ new ShareAcknowledgementBatch(0, 249,
List.of(AcknowledgeType.ACCEPT.id))));
assertEquals(250, sharePartition.nextFetchOffset());
// The SPSO should only move when the initial records in cached state
are acknowledged with type ACKNOWLEDGE or ARCHIVED.
@@ -7523,7 +7525,7 @@ public class SharePartitionTest {
assertFalse(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 249, List.of((byte) 3))));
+ new ShareAcknowledgementBatch(0, 249,
List.of((AcknowledgeType.REJECT.id)))));
assertEquals(250, sharePartition.nextFetchOffset());
// The SPSO should only move when the initial records in cached state
are acknowledged with type ACKNOWLEDGE or ARCHIVED.
@@ -7543,7 +7545,7 @@ public class SharePartitionTest {
assertFalse(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 249, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(0, 249,
List.of(AcknowledgeType.RELEASE.id))));
// The SPSO should only move when the initial records in cached state
are acknowledged with type ACKNOWLEDGE or ARCHIVED.
assertEquals(0, sharePartition.startOffset());
@@ -7571,7 +7573,7 @@ public class SharePartitionTest {
assertFalse(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 12, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(0, 12,
List.of(AcknowledgeType.ACCEPT.id))));
assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(0L).offsetState().get(12L).state());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(13L).state());
@@ -7597,7 +7599,7 @@ public class SharePartitionTest {
assertFalse(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 14, List.of((byte) 3))));
+ new ShareAcknowledgementBatch(0, 14,
List.of(AcknowledgeType.REJECT.id))));
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
assertEquals(MEMBER_ID,
sharePartition.cachedState().get(15L).batchMemberId());
@@ -7624,7 +7626,7 @@ public class SharePartitionTest {
assertFalse(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 3))));
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.REJECT.id))));
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(9L).state());
assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(0L).offsetState().get(10L).state());
@@ -7655,7 +7657,7 @@ public class SharePartitionTest {
assertFalse(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 29, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(0, 29,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(sharePartition.canAcquireRecords());
assertEquals(30, sharePartition.startOffset());
@@ -7683,7 +7685,7 @@ public class SharePartitionTest {
// First Acknowledgement for the first batch of records 0-19.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 19, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(0, 19,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(sharePartition.canAcquireRecords());
assertEquals(20, sharePartition.startOffset());
@@ -7695,7 +7697,7 @@ public class SharePartitionTest {
assertTrue(sharePartition.canAcquireRecords());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(20, 49, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(20, 49,
List.of(AcknowledgeType.ACCEPT.id))));
assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(40L).offsetState().get(49L).state());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(40L).offsetState().get(50L).state());
@@ -7711,7 +7713,7 @@ public class SharePartitionTest {
// Final Acknowledgement, all records are acknowledged here.
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(50, 179, List.of((byte) 3))));
+ new ShareAcknowledgementBatch(50, 179,
List.of(AcknowledgeType.REJECT.id))));
assertEquals(0, sharePartition.cachedState().size());
assertTrue(sharePartition.canAcquireRecords());
@@ -7761,7 +7763,7 @@ public class SharePartitionTest {
// Sending acknowledgement for the first batch from 11 to 20
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(11, 20, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(11, 20,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(sharePartition.canAcquireRecords());
// After the acknowledgement is done successfully,
maybeUpdateCachedStateAndOffsets method is invoked to see
@@ -7810,7 +7812,7 @@ public class SharePartitionTest {
assertEquals(249, sharePartition.endOffset());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 249, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(0, 249,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(sharePartition.canAcquireRecords());
assertEquals(250, sharePartition.startOffset());
@@ -7832,7 +7834,7 @@ public class SharePartitionTest {
assertEquals(249, sharePartition.endOffset());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 89, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(0, 89,
List.of(AcknowledgeType.RELEASE.id))));
// The SPSO should only move when the initial records in cached state
are acknowledged with type ACKNOWLEDGE or ARCHIVED.
assertEquals(0, sharePartition.startOffset());
@@ -7856,7 +7858,7 @@ public class SharePartitionTest {
assertEquals(249, sharePartition.endOffset());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 89, List.of((byte) 3))));
+ new ShareAcknowledgementBatch(0, 89,
List.of(AcknowledgeType.REJECT.id))));
// The SPSO should only move when the initial records in cached state
are acknowledged with type ACKNOWLEDGE or ARCHIVED.
assertEquals(90, sharePartition.startOffset());
@@ -7879,7 +7881,7 @@ public class SharePartitionTest {
assertEquals(249, sharePartition.endOffset());
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 89, List.of((byte)
1))));
+ new ShareAcknowledgementBatch(0, 89,
List.of(AcknowledgeType.ACCEPT.id))));
// The SPSO should only move when the initial records in cached state
are acknowledged with type ACKNOWLEDGE or ARCHIVED.
assertEquals(90, sharePartition.startOffset());
@@ -7906,7 +7908,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10);
CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(UnknownTopicOrPartitionException.class, ackResult);
@@ -7935,7 +7937,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 6), 6);
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(8, 10, List.of((byte)
3))));
+ List.of(new ShareAcknowledgementBatch(8, 10,
List.of(AcknowledgeType.REJECT.id))));
assertTrue(ackResult.isCompletedExceptionally());
// Due to failure in writeShareGroupState, the cached state should not
be updated.
@@ -7961,11 +7963,11 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 7);
sharePartition.acknowledge(MEMBER_ID,
- List.of(new ShareAcknowledgementBatch(5, 7, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(5, 7,
List.of(AcknowledgeType.ACCEPT.id))));
// Acknowledge subset with another member.
CompletableFuture<Void> ackResult =
sharePartition.acknowledge("member-2",
- List.of(new ShareAcknowledgementBatch(9, 11, List.of((byte)
1))));
+ List.of(new ShareAcknowledgementBatch(9, 11,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
}
@@ -7981,10 +7983,10 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(15, 5), 5);
CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)),
+ new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id)),
// Acknowledging batch with another member will cause failure
and rollback.
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)),
- new ShareAcknowledgementBatch(15, 19, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(15, 19,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
@@ -8010,10 +8012,10 @@ public class SharePartitionTest {
sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15,
fetchPartitionData(memoryRecords(15, 5)), FETCH_ISOLATION_HWM);
CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)),
- new ShareAcknowledgementBatch(10, 14, List.of((byte) 1)),
+ new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(10, 14,
List.of(AcknowledgeType.ACCEPT.id)),
// Acknowledging subset with another member will cause failure
and rollback.
- new ShareAcknowledgementBatch(16, 18, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(16, 18,
List.of(AcknowledgeType.ACCEPT.id))));
assertTrue(ackResult.isCompletedExceptionally());
assertFutureThrows(InvalidRecordStateException.class, ackResult);
@@ -8040,11 +8042,11 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records, 10);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 14, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
fetchAcquiredRecords(sharePartition, records, 10);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 14, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
// All the records in the batch reached the max delivery count, hence
they got archived and the cached state cleared.
assertEquals(15, sharePartition.nextFetchOffset());
@@ -8068,9 +8070,9 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records2, 5);
sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of(
- new ShareAcknowledgementBatch(10, 12, List.of((byte) 1)),
- new ShareAcknowledgementBatch(13, 16, List.of((byte) 2)),
- new ShareAcknowledgementBatch(17, 19, List.of((byte) 1)))));
+ new ShareAcknowledgementBatch(10, 12,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(13, 16,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(17, 19,
List.of(AcknowledgeType.ACCEPT.id)))));
// Send next batch from offset 13, only 2 records should be acquired.
fetchAcquiredRecords(sharePartition, records1, 2);
@@ -8078,7 +8080,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records2, 2);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(13, 16, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(13, 16,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(20, sharePartition.nextFetchOffset());
// Cached state will be empty because after the second release, the
acquired records will now have moved to
@@ -8098,12 +8100,12 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records1, 5);
sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of(
- new ShareAcknowledgementBatch(0, 1, List.of((byte) 2)))));
+ new ShareAcknowledgementBatch(0, 1,
List.of(AcknowledgeType.RELEASE.id)))));
// Send next batch from offset 0, only 2 records should be acquired.
fetchAcquiredRecords(sharePartition, memoryRecords(2), 2);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(0, 4, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(0, 4,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(2, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
@@ -8135,7 +8137,7 @@ public class SharePartitionTest {
assertEquals(20, sharePartition.nextFetchOffset());
sharePartition.acknowledge(memberId1, List.of(
- new ShareAcknowledgementBatch(5, 9, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id))));
assertTrue(sharePartition.findNextFetchOffset());
assertEquals(5, sharePartition.nextFetchOffset());
@@ -8161,7 +8163,7 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.nextFetchOffset());
sharePartition.acknowledge(memberId1, List.of(
- new ShareAcknowledgementBatch(0, 2, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(0, 2,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.inFlightTerminalRecords());
@@ -8172,7 +8174,7 @@ public class SharePartitionTest {
assertEquals(5, sharePartition.nextFetchOffset());
sharePartition.acknowledge(memberId2, List.of(
- new ShareAcknowledgementBatch(3, 4, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(3, 4,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(3, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.inFlightTerminalRecords());
}
@@ -8185,7 +8187,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(2, 6, List.of((byte) 1))));
+ new ShareAcknowledgementBatch(2, 6,
List.of(AcknowledgeType.ACCEPT.id))));
// Acknowledge records will induce 1 write state RPC call via function
isWriteShareGroupStateSuccessful.
Mockito.verify(sharePartition,
Mockito.times(1)).writeShareGroupState(anyList());
@@ -8204,10 +8206,10 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(10, 12), 12);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 11, List.of((byte) 2)),
- new ShareAcknowledgementBatch(12, 13, List.of((byte) 0)),
- new ShareAcknowledgementBatch(14, 15, List.of((byte) 2)),
- new ShareAcknowledgementBatch(17, 20, List.of((byte) 2))));
+ new ShareAcknowledgementBatch(5, 11,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(12, 13,
List.of(ACKNOWLEDGE_TYPE_GAP_ID)),
+ new ShareAcknowledgementBatch(14, 15,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(17, 20,
List.of(AcknowledgeType.RELEASE.id))));
// Records 12-13 have been identified as gaps, hence they are kept in
the cache as ARCHIVED state.
assertEquals(2, sharePartition.inFlightTerminalRecords());
@@ -8296,8 +8298,8 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
List<ShareAcknowledgementBatch> acknowledgementBatches = new
ArrayList<>();
- acknowledgementBatches.add(new ShareAcknowledgementBatch(2, 3,
List.of((byte) 2)));
- acknowledgementBatches.add(new ShareAcknowledgementBatch(5, 9,
List.of((byte) 2)));
+ acknowledgementBatches.add(new ShareAcknowledgementBatch(2, 3,
List.of(AcknowledgeType.RELEASE.id)));
+ acknowledgementBatches.add(new ShareAcknowledgementBatch(5, 9,
List.of(AcknowledgeType.RELEASE.id)));
// Acknowledge 2-3, 5-9 offsets with RELEASE acknowledge type.
sharePartition.acknowledge(MEMBER_ID, acknowledgementBatches);