AndrewJSchofield commented on code in PR #20978:
URL: https://github.com/apache/kafka/pull/20978#discussion_r2557213968
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2018,7 +2018,14 @@ private boolean checkForStartOffsetWithinBatch(long
batchFirstOffset, long batch
*/
private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch,
long requestFirstOffset, long requestLastOffset) {
if (inFlightBatch.offsetState() == null) {
- return inFlightBatch.batchDeliveryCount() >=
throttleRecordsDeliveryLimit;
+ // If offsetState is null, it means the batch is not split and
represents a single batch.
+ // Check if the batch is in AVAILABLE state and has no ongoing
transition.
+ // The requested batch shall always be within the request first
and last offset as the sub
+ // map batches are only fetched to consider.
+ if (inFlightBatch.batchState() == RecordState.AVAILABLE &&
!inFlightBatch.batchHasOngoingStateTransition()) {
+ return inFlightBatch.batchDeliveryCount() >=
throttleRecordsDeliveryLimit;
+ }
+ return false;
Review Comment:
nit: Double space.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11590,6 +11590,122 @@ public void
testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() {
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
}
+ @Test
+ public void testAcquisitionThrottlingWithOnGoingStateTransition() {
Review Comment:
nit: Ongoing is a word, so
`testAcquisitionThrottlingWithOngoingStateTransition` is slightly better.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11590,6 +11590,122 @@ public void
testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() {
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
}
+ @Test
+ public void testAcquisitionThrottlingWithOnGoingStateTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 15L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 1),
+ // Batch of 20-24 has been set to delivery count of 2
so in next acquisition it will be 3,
+ // and post that it should be throttled but because of
pending state transition it
+ // should not be throttled.
+ new PersisterStateBatch(20L, 24L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(25L, 29L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(30L, 34L,
RecordState.AVAILABLE.id, (short) 2),
+ // Similarly, batch of 35-39 has been set to delivery
count of 2 so in next offset
+ // acquisition, some offsets will be at 3 delivery
count, and post that offsets
+ // should be throttled but because of pending state
transition they will not be throttled.
+ new PersisterStateBatch(35, 39L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(40, 44L,
RecordState.ARCHIVED.id, (short) 5),
+ new PersisterStateBatch(45, 49L,
RecordState.AVAILABLE.id, (short) 1)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ // Acquire batches 20-24 and 36-37 (offset based) and create a pending
state transition.
+ fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(20L).batchState());
+ fetchAcquiredRecords(sharePartition, memoryRecords(36, 2), 2);
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(35L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(38L).state());
+
+ // Create a pending future which will block state updates.
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+
+ // Release batch of 20-24 and offset 36-37, which will have pending
state transition.
+ sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(20, 24,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(36, 37,
List.of(AcknowledgeType.RELEASE.id))));
+
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+
+
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(35L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(36L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(37L).hasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(38L).hasOngoingStateTransition());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 5).close();
+ memoryRecordsBuilder(buffer, 20, 5).close();
+ memoryRecordsBuilder(buffer, 25, 5).close();
+ memoryRecordsBuilder(buffer, 30, 5).close();
+ memoryRecordsBuilder(buffer, 35, 5).close();
+ memoryRecordsBuilder(buffer, 40, 5).close();
+ memoryRecordsBuilder(buffer, 45, 5).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ // Acquire batches and batch 15-19, 25-29 will be acquired as batch
20-24 has pending state transition.
+ // Without pending transition, the acquisition would have happened
only for 20-24 batch as the batch
+ // 20-24 would have marked to be throttled but eventually couldn't be
acquired because of state transition.
+ // However, due to throttle check being incorrectly true the further
batch of 25-29 would have been skipped.
Review Comment:
nit: The final sentence of this comment is really a description of the
defect fix in this PR and it's not really helpful now it is understood. I
suggest removing the final sentence of the comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]