josefk31 commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2879486108


##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,23 +1081,66 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3})
+    public void testReadRespectsMaxSizeInBytes(int expectedBatches) throws 
IOException {
+        // 5 records are written in batches of 101 bytes each (at time of 
writing).
+        int magicMaxBatchSizeBytes = 101;
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                magicMaxBatchSizeBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        LogFetchInfo info = log.read(
+                0,
+                Isolation.UNCOMMITTED,
+                magicMaxBatchSizeBytes * expectedBatches
+        );
+        // Asserts that we have exactly B * R records. Further there must be B 
batches of SimpleRecords each with a value of
+        // [0..R-1] converted to an utf-8 string with empty keys and headers.
+        int count = 0;
+        for (Record record : info.records.records()) {
+            byte[] expectedValue = String.valueOf(count % 
recordsPerBatch).getBytes(StandardCharsets.UTF_8);
+            SimpleRecord expected = new SimpleRecord(expectedValue);
+            SimpleRecord actual = new SimpleRecord(
+                    record.timestamp(),
+                    record.key(),
+                    record.value(),
+                    record.headers()
+            );
+
+            assertEquals(expected, actual);
+            count += 1;
+        }
+        assertEquals(recordsPerBatch * expectedBatches, count);
+
+    }
+
     private static MetadataLogConfig createMetadataLogConfig(
             int internalLogSegmentBytes,
             long logSegmentMillis,
             long retentionMaxBytes,
             long retentionMillis,
-            int internalMaxBatchSizeInBytes, //: Int = 
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-            int internalMaxFetchSizeInBytes //: Int = 
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+            int internalMaxBatchSizeInBytes
     ) {
         Map<String, ?> config = Map.of(
                 MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, 
internalLogSegmentBytes,
                 MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, 
logSegmentMillis,
                 MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG, 
retentionMaxBytes,
                 MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG, 
retentionMillis,
                 
MetadataLogConfig.INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG, 
internalMaxBatchSizeInBytes,
-                
MetadataLogConfig.INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG, 
internalMaxFetchSizeInBytes,
                 
MetadataLogConfig.INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG, 
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
-                );
+        );

Review Comment:
   I'll consider undoing this indentation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to