ahuang98 commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2688244489


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +93,75 @@ private static void 
testFetchResponseWithInvalidRecord(MemoryRecords records, in
         assertEquals(oldLogEndOffset, context.log.endOffset().offset());
     }
 
+    @Test
+    void testFetchRequestObeysConfiguredMaximumBytesToFetch() throws Exception 
{
+        // Create an explicit test to check that 
controller.quorum.fetch.max.size.bytes is used to construct fetch
+        // requests.
+        int epoch = 2;
+        int localId = KafkaRaftClientTest.randomReplicaId();
+        ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+        ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, 
true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, electedLeader.id())
+            // Explicitly change the configuration here.
+            .withFetchMaxSizeBytes(1024)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        // assertFetchRequestData contains a check which verifies the 
SizeBytes field of the Fetch request.
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
OptionalLong.empty());
+    }
+
+    @Test
+    public void testMaxBytesRequestedFromLogsRespectsValueInFetchRequest() 
throws Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        var localMaxSizeBytes = 1024;
+        var remoteMaxSizeBytes = 512;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+                localKey.id(),
+                localKey.directoryId().get()
+        )
+                .appendToLog(epoch, List.of("a", "b", "c"))
+                .appendToLog(epoch, List.of("d", "e", "f"))
+                .withStartingVoters(
+                        VoterSetTest.voterSet(Stream.of(localKey, remoteKey)), 
KRaftVersion.KRAFT_VERSION_1
+                )
+                .withUnknownLeader(epoch)
+                .withFetchMaxSizeBytes(localMaxSizeBytes)
+                .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // The next read from MockLog will be intended for a fetch request.
+        // We wish to assert that it uses the value supplied from the fetch.
+        context.log.setExpectedMaxTotalRecordsSizeBytes(remoteMaxSizeBytes);
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        // Failures for this test will appear in error-logs.

Review Comment:
   what does this mean? 



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +93,75 @@ private static void 
testFetchResponseWithInvalidRecord(MemoryRecords records, in
         assertEquals(oldLogEndOffset, context.log.endOffset().offset());
     }
 
+    @Test
+    void testFetchRequestObeysConfiguredMaximumBytesToFetch() throws Exception 
{
+        // Create an explicit test to check that 
controller.quorum.fetch.max.size.bytes is used to construct fetch

Review Comment:
   did you mean to keep this? maybe we can reword into a description for the 
test



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }
+
+    @Test
+    public void testNonDefaultReadFromLog() throws IOException {
+        int batchSizeBytes = 1024;
+        int maxSizeToReadBytes = 1;
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                batchSizeBytes,
+                maxSizeToReadBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        // Default value of 1 is NOT used in this case.
+        LogFetchInfo info = log.read(0,
+                Isolation.UNCOMMITTED,
+                batchSizeBytes * 3);

Review Comment:
   any reason for this value in particular?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1242,7 +1242,7 @@ public void testFetchResponseWithSnapshotId(boolean 
withKip853Rpc) throws Except
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            context.fetchSnapshotMaxSizeBytes

Review Comment:
   similar comment above here that says we validate the maxbytes field?



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }
+
+    @Test
+    public void testNonDefaultReadFromLog() throws IOException {
+        int batchSizeBytes = 1024;
+        int maxSizeToReadBytes = 1;
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                batchSizeBytes,
+                maxSizeToReadBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        // Default value of 1 is NOT used in this case.
+        LogFetchInfo info = log.read(0,
+                Isolation.UNCOMMITTED,
+                batchSizeBytes * 3);
+
+        assertRecords(info, recordsPerBatch * 2, recordsPerBatch);
+    }
+
+    private static void assertRecords(LogFetchInfo info, int numberExpected, 
int recordsPerBatch) {

Review Comment:
   java doc!



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }

Review Comment:
   what about a case where the first batch has 2 records?



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {

Review Comment:
   `DefaultLogValue` is a bit generic, what about something like 
`testReadRespectsDefaultInternalMaxFetchSize`



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }
+
+    @Test
+    public void testNonDefaultReadFromLog() throws IOException {
+        int batchSizeBytes = 1024;
+        int maxSizeToReadBytes = 1;
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                batchSizeBytes,
+                maxSizeToReadBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        // Default value of 1 is NOT used in this case.

Review Comment:
   NOT used in the case we supply a `maxTotalRecordsSizeBytes`



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -1866,8 +1888,7 @@ FetchRequestData fetchRequest(
         long fetchOffset,
         int lastFetchedEpoch,
         OptionalLong highWatermark,
-        int maxWaitTimeMs
-    ) {
+        int maxWaitTimeMs) {

Review Comment:
   nit, format



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1321,7 +1323,7 @@ public void testFetchSnapshotResponsePartialData(boolean 
withKip853Rpc) throws E
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            context.fetchSnapshotMaxSizeBytes

Review Comment:
   why is this not `maxFetchSnapshotSizeBytes`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to