AndrewJSchofield commented on code in PR #21678:
URL: https://github.com/apache/kafka/pull/21678#discussion_r2905295645


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########


Review Comment:
   I know this is adjacent code and not strictly part of this PR, but please 
could you put the missing javadoc on this method. Thanks.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -12427,6 +12427,183 @@ public void 
testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() {
         assertFalse(sharePartition.cachedState().isEmpty());
     }
 
+    @Test
+    public void testMaxInFlightRecordsUsesGroupConfigWhenPresent() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(2000)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // maxInFlightRecords() should return the group config value, not the 
default.
+        assertEquals(5000, sharePartition.maxInFlightRecords());
+    }
+
+    @Test
+    public void testMaxInFlightRecordsFallsBackToDefaultWhenNoGroupConfig() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(2000)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // maxInFlightRecords() should return the default value.
+        assertEquals(2000, sharePartition.maxInFlightRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksDecrease() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(100)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(0, 50);
+
+        // Acquire 50 records, which is under the default limit of 100.
+        fetchAcquiredRecords(sharePartition, records, 50);
+
+        // Dynamically decrease the limit to 30 via group config.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(30);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        // The effective limit should now be 30.
+        assertEquals(30, sharePartition.maxInFlightRecords());
+
+        // 50 in-flight > 30 limit, but canAcquireRecords checks 
nextFetchOffset != endOffset + 1
+        // first. Since all records [0-49] are acquired, nextFetchOffset == 
endOffset + 1 == 50,
+        // so the second check (numInFlightRecords < maxInFlightRecords) 
applies: 50 < 30 is false.
+        assertFalse(sharePartition.canAcquireRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksIncrease() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(10)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(0, 10);
+
+        // Acquire 10 records, hitting the default limit.
+        fetchAcquiredRecords(sharePartition, records, 10);
+
+        // canAcquireRecords should be false with default limit of 10.
+        assertFalse(sharePartition.canAcquireRecords());
+
+        // Increase limit to 500 via group config.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        assertEquals(500, sharePartition.maxInFlightRecords());
+
+        // Now canAcquireRecords should be true: 10 < 500.
+        assertTrue(sharePartition.canAcquireRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksExactBoundary() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(50)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(0, 50);
+
+        // Acquire exactly 50 records, hitting the default limit exactly.
+        fetchAcquiredRecords(sharePartition, records, 50);
+
+        // At exact boundary: 50 < 50 is false.
+        assertFalse(sharePartition.canAcquireRecords());
+
+        // Dynamically set limit to exactly the in-flight count via group 
config.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(50);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        // Still at boundary: 50 < 50 is false.
+        assertFalse(sharePartition.canAcquireRecords());
+
+        // Increase by 1 to cross the boundary.
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(51);
+
+        // Now 50 < 51 is true.
+        assertTrue(sharePartition.canAcquireRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksRemoveGroupConfig() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(100)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Group config sets limit to 500.
+        assertEquals(500, sharePartition.maxInFlightRecords());
+
+        // Remove group config — should fall back to default of 100.
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        assertEquals(100, sharePartition.maxInFlightRecords());
+    }
+
+    @Test
+    public void 
testDynamicPartitionMaxRecordLocksDecreaseBelowInFlightAffectsMaxRecordsToAcquire()
 {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(100)

Review Comment:
   This is really the default max in-flight records I think.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java:
##########
@@ -43,6 +43,14 @@ public class ShareGroupConfig {
     public static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT = 
2000;
     public static final String SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC = 
"Share-group record lock limit per share-partition.";
 
+    public static final String 
SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG = 
"group.share.max.partition.max.record.locks";
+    public static final int SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT 
= 10000;

Review Comment:
   The default for the maximum according to the KIP is 4000.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to