mimaison commented on code in PR #21686:
URL: https://github.com/apache/kafka/pull/21686#discussion_r2910326998


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1913,6 +1943,777 @@ public void 
testRetentionSizeInPercentMetricUpdatedOnDeletionError() throws IOEx
         assertEquals(200, yammerMetricValue(metricName),
                 "Metric should be updated in finally block even when exception 
occurs");
     }
+    
+    @Test
+    public void testReadWithMinMessage() throws IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()

Review Comment:
   We don't use `var` anywhere else in this file or even module. So maybe keep 
the types for consistency.



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1913,6 +1943,777 @@ public void 
testRetentionSizeInPercentMetricUpdatedOnDeletionError() throws IOEx
         assertEquals(200, yammerMetricValue(metricName),
                 "Metric should be updated in finally block even when exception 
occurs");
     }
+    
+    @Test
+    public void testReadWithMinMessage() throws IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(72)
+                .build();
+        log = createLog(logDir, logConfig);
+        var messageIds = IntStream.concat(
+                IntStream.range(0, 50),
+                IntStream.iterate(50, i -> i < 200, i -> i + 7)
+        ).toArray();
+        var records = Arrays.stream(messageIds)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .toArray(SimpleRecord[]::new);
+
+        // now test the case that we give the offsets and use non-sequential 
offsets
+        for (var i = 0; i < records.length; i++) {
+            log.appendAsFollower(
+                    MemoryRecords.withRecords(messageIds[i], Compression.NONE, 
0, records[i]),
+                    Integer.MAX_VALUE
+            );
+        }
+
+        var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+        for (var i = 50; i < maxMessageId; i++) {
+            var offset = i;
+            var idx = IntStream.range(0, messageIds.length)
+                    .filter(j -> messageIds[j] >= offset)
+                    .findFirst()
+                    .getAsInt();
+
+            var fetchResults = List.of(
+                    log.read(i, 1, FetchIsolation.LOG_END, true),
+                    log.read(i, 100000, FetchIsolation.LOG_END, true),
+                    log.read(i, 100, FetchIsolation.LOG_END, true)
+            );
+            for (var fetchDataInfo : fetchResults) {
+                var read = fetchDataInfo.records.records().iterator().next();
+                assertEquals(messageIds[idx], read.offset(), "Offset read 
should match message id.");
+                assertEquals(records[idx], new SimpleRecord(read), "Message 
should match appended.");
+            }
+        }
+    }
+
+    @Test
+    public void testReadWithTooSmallMaxLength() throws IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(72)
+                .build();
+        log = createLog(logDir, logConfig);
+        var messageIds = IntStream.concat(
+                IntStream.range(0, 50),
+                IntStream.iterate(50, i -> i < 200, i -> i + 7)
+        ).toArray();
+        var records = Arrays.stream(messageIds)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .toArray(SimpleRecord[]::new);
+
+        // now test the case that we give the offsets and use non-sequential 
offsets
+        for (var i = 0; i < records.length; i++) {
+            log.appendAsFollower(
+                    MemoryRecords.withRecords(messageIds[i], Compression.NONE, 
0, records[i]),
+                    Integer.MAX_VALUE
+            );
+        }
+
+        var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+        for (var i = 50; i < maxMessageId; i++) {
+            assertEquals(MemoryRecords.EMPTY, log.read(i, 0, 
FetchIsolation.LOG_END, false).records);
+
+            // we return an incomplete message instead of an empty one for the 
case below
+            // we use this mechanism to tell consumers of the fetch request 
version 2 and below that the message size is
+            // larger than the fetch size
+            // in fetch request version 3, we no longer need this as we return 
oversized messages from the first non-empty
+            // partition
+            var fetchInfo = log.read(i, 1, FetchIsolation.LOG_END, false);
+            assertTrue(fetchInfo.firstEntryIncomplete);
+            assertInstanceOf(FileRecords.class, fetchInfo.records);
+            assertEquals(1, fetchInfo.records.sizeInBytes());
+        }
+    }
+
+    /**
+     * Test reading at the boundary of the log, specifically
+     * - reading from the logEndOffset should give an empty message set
+     * - reading from the maxOffset should give an empty message set
+     * - reading beyond the log end offset should throw an 
OffsetOutOfRangeException
+     */
+    @Test
+    public void testReadOutOfRange() throws IOException {
+        // create empty log files to simulate a log starting at offset 1024
+        Files.createFile(LogFileUtils.logFile(logDir, 1024).toPath());
+        Files.createFile(LogFileUtils.offsetIndexFile(logDir, 1024).toPath());
+
+        // set up replica log starting with offset 1024 and with one message 
(at offset 1024)
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1024)
+                .build();
+        log = createLog(logDir, logConfig);
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("42".getBytes())), 0);
+
+        assertEquals(
+                0,
+                log.read(1025, 1000, FetchIsolation.LOG_END, 
true).records.sizeInBytes(),
+                "Reading at the log end offset should produce 0 byte read."
+        );
+
+        assertThrows(OffsetOutOfRangeException.class, () -> log.read(0, 1000, 
FetchIsolation.LOG_END, true));
+        assertThrows(OffsetOutOfRangeException.class, () -> log.read(1026, 
1000, FetchIsolation.LOG_END, true));
+    }
+
+    @Test
+    public void testFlushingEmptyActiveSegments() throws IOException {
+        log = createLog(logDir, new LogConfig(new Properties()));
+        var message = MemoryRecords.withRecords(
+                Compression.NONE,
+                new SimpleRecord(mockTime.milliseconds(), null, 
"Test".getBytes())
+        );
+        
+        log.appendAsLeader(message, 0);
+        log.roll();
+        assertEquals(2, logDir.listFiles(f -> 
f.getName().endsWith(".log")).length);
+        assertEquals(1, logDir.listFiles(f -> 
f.getName().endsWith(".index")).length);
+        assertEquals(0, log.activeSegment().size());
+        log.flush(true);
+        assertEquals(2, logDir.listFiles(f -> 
f.getName().endsWith(".log")).length);
+        assertEquals(2, logDir.listFiles(f -> 
f.getName().endsWith(".index")).length);
+    }
+
+    /**
+     * Test that covers reads and writes on a multisegment log. This test 
appends a bunch of messages
+     * and then reads them all back and checks that the message read and 
offset matches what was appended.
+     */
+    @Test
+    public void testLogRolls() throws IOException, InterruptedException {
+        // create a multipart log with 100 messages
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(100)
+                .build();
+        log = createLog(logDir, logConfig);
+        var numMessages = 100;
+        var messageSets = IntStream.range(0, numMessages)
+                .mapToObj(i -> MemoryRecords.withRecords(
+                        Compression.NONE,
+                        new SimpleRecord(mockTime.milliseconds(), null, 
String.valueOf(i).getBytes()))
+                ).toArray(MemoryRecords[]::new);
+        for (var messageSet : messageSets) {
+            log.appendAsLeader(messageSet, 0);
+        }
+        log.flush(false);
+
+        // do successive reads to ensure all our messages are there
+        var offset = 0L;
+        for (var i = 0; i < numMessages; i++) {
+            var batches = log.read(offset, 1024 * 1024, 
FetchIsolation.LOG_END, true).records.batches();
+            var head = batches.iterator().next();
+            assertEquals(offset, head.lastOffset(), "Offsets not equal");
+
+            var expected = messageSets[i].records().iterator().next();
+            var actual = head.iterator().next();
+            assertEquals(expected.key(), actual.key(), "Keys not equal at 
offset " + offset);
+            assertEquals(expected.value(), actual.value(), "Values not equal 
at offset " + offset);
+            assertEquals(expected.timestamp(), actual.timestamp(), "Timestamps 
not equal at offset " + offset);
+            offset = head.lastOffset() + 1;
+        }
+        var lastRead = log.read(numMessages, 1024 * 1024, 
FetchIsolation.LOG_END, true).records;
+        assertFalse(lastRead.records().iterator().hasNext(), "Should be no 
more messages");
+
+        // check that rolling the log forced a flush, the flush is async so 
retry in case of failure
+        TestUtils.retryOnExceptionWithTimeout(1000L, () ->
+                assertTrue(log.recoveryPoint() >= 
log.activeSegment().baseOffset(), "Log roll should have forced flush")
+        );
+    }
+
+    /**
+     * Test reads at offsets that fall within compressed message set 
boundaries.
+     */
+    @Test
+    public void testCompressedMessages() throws IOException {
+        // this log should roll after every message set
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(110)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        // append 2 compressed message sets, each with two messages giving 
offsets 0, 1, 2, 3
+        
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+                new SimpleRecord("hello".getBytes()), new 
SimpleRecord("there".getBytes())), 0);
+        
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+                new SimpleRecord("alpha".getBytes()), new 
SimpleRecord("beta".getBytes())), 0);
+
+        // we should always get the first message in the compressed set when 
reading any offset in the set
+        assertEquals(0, read(log, 0).iterator().next().offset(), "Read at 
offset 0 should produce 0");
+        assertEquals(0, read(log, 1).iterator().next().offset(), "Read at 
offset 1 should produce 0");
+        assertEquals(2, read(log, 2).iterator().next().offset(), "Read at 
offset 2 should produce 2");
+        assertEquals(2, read(log, 3).iterator().next().offset(), "Read at 
offset 3 should produce 2");
+    }
+
+    private Iterable<Record> read(UnifiedLog log, long offset) throws 
IOException {
+        return log.read(offset, 4096, FetchIsolation.LOG_END, 
true).records.records();
+    }
+
+    /**
+     * Test garbage collecting old segments
+     */
+    @Test
+    public void testThatGarbageCollectingSegmentsDoesntChangeOffset() throws 
IOException {
+        for (int messagesToAppend : List.of(0, 1, 25)) {
+            logDir.mkdirs();
+            // first test a log segment starting at 0
+            var logConfig = new LogTestUtils.LogConfigBuilder()
+                    .segmentBytes(100)
+                    .retentionMs(0)
+                    .build();
+            var testLog = createLog(logDir, logConfig);
+            for (int i = 0; i < messagesToAppend; i++) {
+                
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+                        new SimpleRecord(mockTime.milliseconds() - 10, null, 
String.valueOf(i).getBytes())), 0);
+            }
+
+            var currOffset = testLog.logEndOffset();
+            assertEquals(currOffset, messagesToAppend);
+
+            // time goes by; the log file is deleted
+            testLog.updateHighWatermark(currOffset);
+            testLog.deleteOldSegments();
+
+            assertEquals(currOffset, testLog.logEndOffset(), "Deleting 
segments shouldn't have changed the logEndOffset");
+            assertEquals(1, testLog.numberOfSegments(), "We should still have 
one segment left");
+            assertEquals(0, testLog.deleteOldSegments(), "Further collection 
shouldn't delete anything");
+            assertEquals(currOffset, testLog.logEndOffset(), "Still no change 
in the logEndOffset");
+            assertEquals(currOffset,
+                    
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+                            new SimpleRecord(mockTime.milliseconds(), null, 
"hello".getBytes())), 0).firstOffset(),
+                    "Should still be able to append and should get the 
logEndOffset assigned to the new append");
+
+            // cleanup the log
+            logsToClose.remove(testLog);
+            testLog.delete();
+        }
+    }
+
+    /**
+     * MessageSet size shouldn't exceed the config.segmentSize, check that it 
is properly enforced by
+     * appending a message set larger than the config.segmentSize setting and 
checking that an exception is thrown.
+     */
+    @Test
+    public void testMessageSetSizeCheck() throws IOException {
+        var messageSet = MemoryRecords.withRecords(Compression.NONE,
+                new SimpleRecord("You".getBytes()), new 
SimpleRecord("bethe".getBytes()));
+        // append messages to log
+        var configSegmentSize = messageSet.sizeInBytes() - 1;
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(configSegmentSize)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        assertThrows(RecordBatchTooLargeException.class, () -> 
log.appendAsLeader(messageSet, 0));
+    }
+
+    @Test
+    public void testCompactedTopicConstraints() throws IOException {
+        var keyedMessage = new SimpleRecord("and here it is".getBytes(), "this 
message has a key".getBytes());
+        var anotherKeyedMessage = new SimpleRecord("another key".getBytes(), 
"this message also has a key".getBytes());
+        var unkeyedMessage = new SimpleRecord("this message does not have a 
key".getBytes());
+
+        var messageSetWithUnkeyedMessage = 
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage, keyedMessage);
+        var messageSetWithOneUnkeyedMessage = 
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage);
+        var messageSetWithCompressedKeyedMessage = 
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage);
+        var messageSetWithCompressedUnkeyedMessage = 
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage, 
unkeyedMessage);
+        var messageSetWithKeyedMessage = 
MemoryRecords.withRecords(Compression.NONE, keyedMessage);
+        var messageSetWithKeyedMessages = 
MemoryRecords.withRecords(Compression.NONE, keyedMessage, anotherKeyedMessage);
+
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        var errorMsgPrefix = "Compacted topic cannot accept message without 
key";
+
+        var e = assertThrows(RecordValidationException.class,
+                () -> log.appendAsLeader(messageSetWithUnkeyedMessage, 0));
+        assertInstanceOf(InvalidRecordException.class, e.invalidException());
+        assertEquals(1, e.recordErrors().size());
+        assertEquals(0, e.recordErrors().get(0).batchIndex);
+        assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+        e = assertThrows(RecordValidationException.class,
+                () -> log.appendAsLeader(messageSetWithOneUnkeyedMessage, 0));
+        assertInstanceOf(InvalidRecordException.class, e.invalidException());
+        assertEquals(1, e.recordErrors().size());
+        assertEquals(0, e.recordErrors().get(0).batchIndex);
+        assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+        e = assertThrows(RecordValidationException.class,
+                () -> 
log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, 0));
+        assertInstanceOf(InvalidRecordException.class, e.invalidException());
+        assertEquals(1, e.recordErrors().size());
+        assertEquals(1, e.recordErrors().get(0).batchIndex);  // batch index 
is 1
+        assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+        // check if metric for NoKeyCompactedTopicRecordsPerSec is logged
+        assertEquals(1, 
KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
+                .filter(k -> 
k.getMBeanName().endsWith(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC))
+                .count());
+        
assertTrue(meterCount(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC)
 > 0);
+
+        // the following should succeed without any InvalidMessageException
+        log.appendAsLeader(messageSetWithKeyedMessage, 0);
+        log.appendAsLeader(messageSetWithKeyedMessages, 0);
+        log.appendAsLeader(messageSetWithCompressedKeyedMessage, 0);
+    }
+
+    /**
+     * We have a max size limit on message appends, check that it is properly 
enforced by appending a message larger than the
+     * setting and checking that an exception is thrown.
+     */
+    @Test
+    public void testMessageSizeCheck() throws IOException {
+        var first = MemoryRecords.withRecords(Compression.NONE,
+                new SimpleRecord("You".getBytes()), new 
SimpleRecord("bethe".getBytes()));
+        var second = MemoryRecords.withRecords(Compression.NONE,
+                new SimpleRecord("change (I need more bytes)... blah blah 
blah.".getBytes()),
+                new SimpleRecord("More padding boo hoo".getBytes()));
+
+        // append messages to log
+        var maxMessageSize = second.sizeInBytes() - 1;
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .maxMessageBytes(maxMessageSize)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        // should be able to append the small message
+        log.appendAsLeader(first, 0);
+
+        assertThrows(
+                RecordTooLargeException.class, 
+                () -> log.appendAsLeader(second, 0),
+                "Second message set should throw MessageSizeTooLargeException."
+        );
+    }
+
+    @Test
+    public void testMessageSizeCheckInAppendAsFollower() throws IOException {
+        var first = MemoryRecords.withRecords(0, Compression.NONE, 0,
+                new SimpleRecord("You".getBytes()), new 
SimpleRecord("bethe".getBytes()));
+        var second = MemoryRecords.withRecords(5, Compression.NONE, 0,
+                new SimpleRecord("change (I need more bytes)... blah blah 
blah.".getBytes()),
+                new SimpleRecord("More padding boo hoo".getBytes()));
+
+        log = createLog(logDir, new LogTestUtils.LogConfigBuilder()
+                .maxMessageBytes(second.sizeInBytes() - 1)
+                .build());
+
+        log.appendAsFollower(first, Integer.MAX_VALUE);
+        // the second record is larger than limit but appendAsFollower does 
not validate the size.
+        log.appendAsFollower(second, Integer.MAX_VALUE);
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(InvalidMemoryRecordsProvider.class)
+    public void testInvalidMemoryRecords(MemoryRecords records, 
Optional<Class<Exception>> expectedException) throws IOException {
+        log = createLog(logDir, new LogConfig(new Properties()));
+        var previousEndOffset = log.logEndOffsetMetadata().messageOffset;
+
+        if (expectedException.isPresent()) {
+            assertThrows(expectedException.get(), () -> 
log.appendAsFollower(records, Integer.MAX_VALUE));
+        } else {
+            log.appendAsFollower(records, Integer.MAX_VALUE);
+        }
+
+        assertEquals(previousEndOffset, 
log.logEndOffsetMetadata().messageOffset);
+    }
+
+    @Test
+    public void testRandomRecords() throws IOException {
+        var random = new java.util.Random();

Review Comment:
   We can import Random



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1913,6 +1943,777 @@ public void 
testRetentionSizeInPercentMetricUpdatedOnDeletionError() throws IOEx
         assertEquals(200, yammerMetricValue(metricName),
                 "Metric should be updated in finally block even when exception 
occurs");
     }
+    
+    @Test
+    public void testReadWithMinMessage() throws IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(72)
+                .build();
+        log = createLog(logDir, logConfig);
+        var messageIds = IntStream.concat(
+                IntStream.range(0, 50),
+                IntStream.iterate(50, i -> i < 200, i -> i + 7)
+        ).toArray();
+        var records = Arrays.stream(messageIds)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .toArray(SimpleRecord[]::new);
+
+        // now test the case that we give the offsets and use non-sequential 
offsets
+        for (var i = 0; i < records.length; i++) {
+            log.appendAsFollower(
+                    MemoryRecords.withRecords(messageIds[i], Compression.NONE, 
0, records[i]),
+                    Integer.MAX_VALUE
+            );
+        }
+
+        var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+        for (var i = 50; i < maxMessageId; i++) {
+            var offset = i;
+            var idx = IntStream.range(0, messageIds.length)
+                    .filter(j -> messageIds[j] >= offset)
+                    .findFirst()
+                    .getAsInt();
+
+            var fetchResults = List.of(
+                    log.read(i, 1, FetchIsolation.LOG_END, true),
+                    log.read(i, 100000, FetchIsolation.LOG_END, true),
+                    log.read(i, 100, FetchIsolation.LOG_END, true)
+            );
+            for (var fetchDataInfo : fetchResults) {
+                var read = fetchDataInfo.records.records().iterator().next();
+                assertEquals(messageIds[idx], read.offset(), "Offset read 
should match message id.");
+                assertEquals(records[idx], new SimpleRecord(read), "Message 
should match appended.");
+            }
+        }
+    }
+
+    @Test
+    public void testReadWithTooSmallMaxLength() throws IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(72)
+                .build();
+        log = createLog(logDir, logConfig);
+        var messageIds = IntStream.concat(
+                IntStream.range(0, 50),
+                IntStream.iterate(50, i -> i < 200, i -> i + 7)
+        ).toArray();
+        var records = Arrays.stream(messageIds)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .toArray(SimpleRecord[]::new);
+
+        // now test the case that we give the offsets and use non-sequential 
offsets
+        for (var i = 0; i < records.length; i++) {
+            log.appendAsFollower(
+                    MemoryRecords.withRecords(messageIds[i], Compression.NONE, 
0, records[i]),
+                    Integer.MAX_VALUE
+            );
+        }
+
+        var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+        for (var i = 50; i < maxMessageId; i++) {
+            assertEquals(MemoryRecords.EMPTY, log.read(i, 0, 
FetchIsolation.LOG_END, false).records);
+
+            // we return an incomplete message instead of an empty one for the 
case below
+            // we use this mechanism to tell consumers of the fetch request 
version 2 and below that the message size is
+            // larger than the fetch size
+            // in fetch request version 3, we no longer need this as we return 
oversized messages from the first non-empty
+            // partition
+            var fetchInfo = log.read(i, 1, FetchIsolation.LOG_END, false);
+            assertTrue(fetchInfo.firstEntryIncomplete);
+            assertInstanceOf(FileRecords.class, fetchInfo.records);
+            assertEquals(1, fetchInfo.records.sizeInBytes());
+        }
+    }
+
+    /**
+     * Test reading at the boundary of the log, specifically
+     * - reading from the logEndOffset should give an empty message set
+     * - reading from the maxOffset should give an empty message set
+     * - reading beyond the log end offset should throw an 
OffsetOutOfRangeException
+     */
+    @Test
+    public void testReadOutOfRange() throws IOException {
+        // create empty log files to simulate a log starting at offset 1024
+        Files.createFile(LogFileUtils.logFile(logDir, 1024).toPath());
+        Files.createFile(LogFileUtils.offsetIndexFile(logDir, 1024).toPath());
+
+        // set up replica log starting with offset 1024 and with one message 
(at offset 1024)
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1024)
+                .build();
+        log = createLog(logDir, logConfig);
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("42".getBytes())), 0);
+
+        assertEquals(
+                0,
+                log.read(1025, 1000, FetchIsolation.LOG_END, 
true).records.sizeInBytes(),
+                "Reading at the log end offset should produce 0 byte read."
+        );
+
+        assertThrows(OffsetOutOfRangeException.class, () -> log.read(0, 1000, 
FetchIsolation.LOG_END, true));
+        assertThrows(OffsetOutOfRangeException.class, () -> log.read(1026, 
1000, FetchIsolation.LOG_END, true));
+    }
+
+    @Test
+    public void testFlushingEmptyActiveSegments() throws IOException {
+        log = createLog(logDir, new LogConfig(new Properties()));
+        var message = MemoryRecords.withRecords(
+                Compression.NONE,
+                new SimpleRecord(mockTime.milliseconds(), null, 
"Test".getBytes())
+        );
+        
+        log.appendAsLeader(message, 0);
+        log.roll();
+        assertEquals(2, logDir.listFiles(f -> 
f.getName().endsWith(".log")).length);
+        assertEquals(1, logDir.listFiles(f -> 
f.getName().endsWith(".index")).length);
+        assertEquals(0, log.activeSegment().size());
+        log.flush(true);
+        assertEquals(2, logDir.listFiles(f -> 
f.getName().endsWith(".log")).length);
+        assertEquals(2, logDir.listFiles(f -> 
f.getName().endsWith(".index")).length);
+    }
+
+    /**
+     * Test that covers reads and writes on a multisegment log. This test 
appends a bunch of messages
+     * and then reads them all back and checks that the message read and 
offset matches what was appended.
+     */
+    @Test
+    public void testLogRolls() throws IOException, InterruptedException {
+        // create a multipart log with 100 messages
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(100)
+                .build();
+        log = createLog(logDir, logConfig);
+        var numMessages = 100;
+        var messageSets = IntStream.range(0, numMessages)
+                .mapToObj(i -> MemoryRecords.withRecords(
+                        Compression.NONE,
+                        new SimpleRecord(mockTime.milliseconds(), null, 
String.valueOf(i).getBytes()))
+                ).toArray(MemoryRecords[]::new);
+        for (var messageSet : messageSets) {
+            log.appendAsLeader(messageSet, 0);
+        }
+        log.flush(false);
+
+        // do successive reads to ensure all our messages are there
+        var offset = 0L;
+        for (var i = 0; i < numMessages; i++) {
+            var batches = log.read(offset, 1024 * 1024, 
FetchIsolation.LOG_END, true).records.batches();
+            var head = batches.iterator().next();
+            assertEquals(offset, head.lastOffset(), "Offsets not equal");
+
+            var expected = messageSets[i].records().iterator().next();
+            var actual = head.iterator().next();
+            assertEquals(expected.key(), actual.key(), "Keys not equal at 
offset " + offset);
+            assertEquals(expected.value(), actual.value(), "Values not equal 
at offset " + offset);
+            assertEquals(expected.timestamp(), actual.timestamp(), "Timestamps 
not equal at offset " + offset);
+            offset = head.lastOffset() + 1;
+        }
+        var lastRead = log.read(numMessages, 1024 * 1024, 
FetchIsolation.LOG_END, true).records;
+        assertFalse(lastRead.records().iterator().hasNext(), "Should be no 
more messages");
+
+        // check that rolling the log forced a flush, the flush is async so 
retry in case of failure
+        TestUtils.retryOnExceptionWithTimeout(1000L, () ->
+                assertTrue(log.recoveryPoint() >= 
log.activeSegment().baseOffset(), "Log roll should have forced flush")
+        );
+    }
+
+    /**
+     * Test reads at offsets that fall within compressed message set 
boundaries.
+     */
+    @Test
+    public void testCompressedMessages() throws IOException {
+        // this log should roll after every message set
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(110)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        // append 2 compressed message sets, each with two messages giving 
offsets 0, 1, 2, 3
+        
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+                new SimpleRecord("hello".getBytes()), new 
SimpleRecord("there".getBytes())), 0);
+        
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+                new SimpleRecord("alpha".getBytes()), new 
SimpleRecord("beta".getBytes())), 0);
+
+        // we should always get the first message in the compressed set when 
reading any offset in the set
+        assertEquals(0, read(log, 0).iterator().next().offset(), "Read at 
offset 0 should produce 0");
+        assertEquals(0, read(log, 1).iterator().next().offset(), "Read at 
offset 1 should produce 0");
+        assertEquals(2, read(log, 2).iterator().next().offset(), "Read at 
offset 2 should produce 2");
+        assertEquals(2, read(log, 3).iterator().next().offset(), "Read at 
offset 3 should produce 2");
+    }
+
+    private Iterable<Record> read(UnifiedLog log, long offset) throws 
IOException {
+        return log.read(offset, 4096, FetchIsolation.LOG_END, 
true).records.records();
+    }
+
+    /**
+     * Test garbage collecting old segments
+     */
+    @Test
+    public void testThatGarbageCollectingSegmentsDoesntChangeOffset() throws 
IOException {
+        for (int messagesToAppend : List.of(0, 1, 25)) {
+            logDir.mkdirs();
+            // first test a log segment starting at 0
+            var logConfig = new LogTestUtils.LogConfigBuilder()
+                    .segmentBytes(100)
+                    .retentionMs(0)
+                    .build();
+            var testLog = createLog(logDir, logConfig);
+            for (int i = 0; i < messagesToAppend; i++) {
+                
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+                        new SimpleRecord(mockTime.milliseconds() - 10, null, 
String.valueOf(i).getBytes())), 0);
+            }
+
+            var currOffset = testLog.logEndOffset();
+            assertEquals(currOffset, messagesToAppend);
+
+            // time goes by; the log file is deleted
+            testLog.updateHighWatermark(currOffset);
+            testLog.deleteOldSegments();
+
+            assertEquals(currOffset, testLog.logEndOffset(), "Deleting 
segments shouldn't have changed the logEndOffset");
+            assertEquals(1, testLog.numberOfSegments(), "We should still have 
one segment left");
+            assertEquals(0, testLog.deleteOldSegments(), "Further collection 
shouldn't delete anything");
+            assertEquals(currOffset, testLog.logEndOffset(), "Still no change 
in the logEndOffset");
+            assertEquals(currOffset,
+                    
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+                            new SimpleRecord(mockTime.milliseconds(), null, 
"hello".getBytes())), 0).firstOffset(),
+                    "Should still be able to append and should get the 
logEndOffset assigned to the new append");
+
+            // cleanup the log
+            logsToClose.remove(testLog);
+            testLog.delete();
+        }
+    }
+
+    /**
+     * MessageSet size shouldn't exceed the config.segmentSize, check that it 
is properly enforced by
+     * appending a message set larger than the config.segmentSize setting and 
checking that an exception is thrown.
+     */
+    @Test
+    public void testMessageSetSizeCheck() throws IOException {
+        var messageSet = MemoryRecords.withRecords(Compression.NONE,
+                new SimpleRecord("You".getBytes()), new 
SimpleRecord("bethe".getBytes()));
+        // append messages to log
+        var configSegmentSize = messageSet.sizeInBytes() - 1;
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(configSegmentSize)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        assertThrows(RecordBatchTooLargeException.class, () -> 
log.appendAsLeader(messageSet, 0));
+    }
+
+    @Test
+    public void testCompactedTopicConstraints() throws IOException {
+        var keyedMessage = new SimpleRecord("and here it is".getBytes(), "this 
message has a key".getBytes());
+        var anotherKeyedMessage = new SimpleRecord("another key".getBytes(), 
"this message also has a key".getBytes());
+        var unkeyedMessage = new SimpleRecord("this message does not have a 
key".getBytes());
+
+        var messageSetWithUnkeyedMessage = 
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage, keyedMessage);
+        var messageSetWithOneUnkeyedMessage = 
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage);
+        var messageSetWithCompressedKeyedMessage = 
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage);
+        var messageSetWithCompressedUnkeyedMessage = 
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage, 
unkeyedMessage);
+        var messageSetWithKeyedMessage = 
MemoryRecords.withRecords(Compression.NONE, keyedMessage);
+        var messageSetWithKeyedMessages = 
MemoryRecords.withRecords(Compression.NONE, keyedMessage, anotherKeyedMessage);
+
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        var errorMsgPrefix = "Compacted topic cannot accept message without 
key";
+
+        var e = assertThrows(RecordValidationException.class,
+                () -> log.appendAsLeader(messageSetWithUnkeyedMessage, 0));
+        assertInstanceOf(InvalidRecordException.class, e.invalidException());
+        assertEquals(1, e.recordErrors().size());
+        assertEquals(0, e.recordErrors().get(0).batchIndex);
+        assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+        e = assertThrows(RecordValidationException.class,
+                () -> log.appendAsLeader(messageSetWithOneUnkeyedMessage, 0));
+        assertInstanceOf(InvalidRecordException.class, e.invalidException());
+        assertEquals(1, e.recordErrors().size());
+        assertEquals(0, e.recordErrors().get(0).batchIndex);
+        assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+        e = assertThrows(RecordValidationException.class,
+                () -> 
log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, 0));
+        assertInstanceOf(InvalidRecordException.class, e.invalidException());
+        assertEquals(1, e.recordErrors().size());
+        assertEquals(1, e.recordErrors().get(0).batchIndex);  // batch index 
is 1
+        assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+        // check if metric for NoKeyCompactedTopicRecordsPerSec is logged
+        assertEquals(1, 
KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
+                .filter(k -> 
k.getMBeanName().endsWith(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC))
+                .count());
+        
assertTrue(meterCount(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC)
 > 0);
+
+        // the following should succeed without any InvalidMessageException
+        log.appendAsLeader(messageSetWithKeyedMessage, 0);
+        log.appendAsLeader(messageSetWithKeyedMessages, 0);
+        log.appendAsLeader(messageSetWithCompressedKeyedMessage, 0);
+    }
+
+    /**
+     * We have a max size limit on message appends, check that it is properly 
enforced by appending a message larger than the
+     * setting and checking that an exception is thrown.
+     */
+    @Test
+    public void testMessageSizeCheck() throws IOException {
+        var first = MemoryRecords.withRecords(Compression.NONE,
+                new SimpleRecord("You".getBytes()), new 
SimpleRecord("bethe".getBytes()));
+        var second = MemoryRecords.withRecords(Compression.NONE,
+                new SimpleRecord("change (I need more bytes)... blah blah 
blah.".getBytes()),
+                new SimpleRecord("More padding boo hoo".getBytes()));
+
+        // append messages to log
+        var maxMessageSize = second.sizeInBytes() - 1;
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .maxMessageBytes(maxMessageSize)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        // should be able to append the small message
+        log.appendAsLeader(first, 0);
+
+        assertThrows(
+                RecordTooLargeException.class, 
+                () -> log.appendAsLeader(second, 0),
+                "Second message set should throw MessageSizeTooLargeException."
+        );
+    }
+
+    @Test
+    public void testMessageSizeCheckInAppendAsFollower() throws IOException {
+        var first = MemoryRecords.withRecords(0, Compression.NONE, 0,
+                new SimpleRecord("You".getBytes()), new 
SimpleRecord("bethe".getBytes()));
+        var second = MemoryRecords.withRecords(5, Compression.NONE, 0,
+                new SimpleRecord("change (I need more bytes)... blah blah 
blah.".getBytes()),
+                new SimpleRecord("More padding boo hoo".getBytes()));
+
+        log = createLog(logDir, new LogTestUtils.LogConfigBuilder()
+                .maxMessageBytes(second.sizeInBytes() - 1)
+                .build());
+
+        log.appendAsFollower(first, Integer.MAX_VALUE);
+        // the second record is larger than limit but appendAsFollower does 
not validate the size.
+        log.appendAsFollower(second, Integer.MAX_VALUE);
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(InvalidMemoryRecordsProvider.class)
+    public void testInvalidMemoryRecords(MemoryRecords records, 
Optional<Class<Exception>> expectedException) throws IOException {
+        log = createLog(logDir, new LogConfig(new Properties()));
+        var previousEndOffset = log.logEndOffsetMetadata().messageOffset;
+
+        if (expectedException.isPresent()) {
+            assertThrows(expectedException.get(), () -> 
log.appendAsFollower(records, Integer.MAX_VALUE));
+        } else {
+            log.appendAsFollower(records, Integer.MAX_VALUE);
+        }
+
+        assertEquals(previousEndOffset, 
log.logEndOffsetMetadata().messageOffset);
+    }
+
+    @Test
+    public void testRandomRecords() throws IOException {
+        var random = new java.util.Random();
+        for (int i = 0; i < 100; i++) {
+            var size = random.nextInt(128) + 1;
+            var bytes = new byte[size];
+            random.nextBytes(bytes);
+            var records = 
MemoryRecords.readableRecords(ByteBuffer.wrap(bytes));
+
+            var tempDir = TestUtils.tempDirectory();
+            var randomLogDir = TestUtils.randomPartitionLogDir(tempDir);
+            var testLog = createLog(randomLogDir, new LogConfig(new 
Properties()));
+            try {
+                var previousEndOffset = 
testLog.logEndOffsetMetadata().messageOffset;
+
+                // Depending on the corruption, unified log sometimes throws 
and sometimes returns an
+                // empty set of batches
+                assertThrows(CorruptRecordException.class, () -> {
+                    var info = testLog.appendAsFollower(records, 
Integer.MAX_VALUE);
+                    if (info.firstOffset() == UnifiedLog.UNKNOWN_OFFSET) {
+                        throw new CorruptRecordException("Unknown offset is 
test");
+                    }
+                });
+
+                assertEquals(previousEndOffset, 
testLog.logEndOffsetMetadata().messageOffset);
+            } finally {
+                logsToClose.remove(testLog);
+                testLog.close();
+                Utils.delete(tempDir);
+            }
+        }
+    }
+
+    @Test
+    public void testInvalidLeaderEpoch() throws IOException {
+        log = createLog(logDir, new LogConfig(new Properties()));
+        var previousEndOffset = log.logEndOffsetMetadata().messageOffset;
+        var epoch = log.latestEpoch().orElse(0) + 1;
+        var numberOfRecords = 10;
+
+        var recordsForBatch = IntStream.range(0, numberOfRecords)
+                .mapToObj(n -> new SimpleRecord(String.valueOf(n).getBytes()))
+                .toArray(SimpleRecord[]::new);
+
+        var batchWithValidEpoch = MemoryRecords.withRecords(
+                previousEndOffset, Compression.NONE, epoch, recordsForBatch);
+
+        var batchWithInvalidEpoch = MemoryRecords.withRecords(
+                previousEndOffset + numberOfRecords, Compression.NONE, epoch + 
1, recordsForBatch);
+
+        var buffer = ByteBuffer.allocate(batchWithValidEpoch.sizeInBytes() + 
batchWithInvalidEpoch.sizeInBytes());
+        buffer.put(batchWithValidEpoch.buffer());
+        buffer.put(batchWithInvalidEpoch.buffer());
+        buffer.flip();
+
+        var combinedRecords = MemoryRecords.readableRecords(buffer);
+        log.appendAsFollower(combinedRecords, epoch);
+
+        // Check that only the first batch was appended
+        assertEquals(previousEndOffset + numberOfRecords, 
log.logEndOffsetMetadata().messageOffset);
+        // Check that the last fetched epoch matches the first batch
+        assertEquals(epoch, (int) log.latestEpoch().get());
+    }
+
+    @Test
+    public void testLogFlushesPartitionMetadataOnAppend() throws IOException {
+        log = createLog(logDir, new LogConfig(new Properties()));
+        var record = MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("simpleValue".getBytes()));
+
+        var topicId = Uuid.randomUuid();
+        log.partitionMetadataFile().get().record(topicId);
+
+        // Should trigger a synchronous flush
+        log.appendAsLeader(record, 0);
+        assertTrue(log.partitionMetadataFile().get().exists());
+        assertEquals(topicId, 
log.partitionMetadataFile().get().read().topicId());
+    }
+
+    @Test
+    public void testLogFlushesPartitionMetadataOnClose() throws IOException {
+        var logConfig = new LogConfig(new Properties());
+        var firstLog = createLog(logDir, logConfig);
+        var topicId = Uuid.randomUuid();
+        firstLog.partitionMetadataFile().get().record(topicId);
+
+        // Should trigger a synchronous flush
+        firstLog.close();
+
+        // We open the log again, and the partition metadata file should exist 
with the same ID.
+        log = createLog(logDir, logConfig);
+        assertTrue(log.partitionMetadataFile().get().exists());
+        assertEquals(topicId, 
log.partitionMetadataFile().get().read().topicId());
+    }
+
+    @Test
+    public void testLogRecoversTopicId() throws IOException {
+        var logConfig = new LogConfig(new Properties());
+        var firstLog = createLog(logDir, logConfig);
+        var topicId = Uuid.randomUuid();
+        firstLog.assignTopicId(topicId);
+        firstLog.close();
+
+        // test recovery case
+        log = createLog(logDir, logConfig);
+        assertTrue(log.topicId().isPresent());
+        assertEquals(topicId, log.topicId().get());
+    }
+
+    @Test
+    public void testLogFailsWhenInconsistentTopicIdSet() throws IOException {
+        var logConfig = new LogConfig(new Properties());
+        var firstLog = createLog(logDir, logConfig);
+        var topicId = Uuid.randomUuid();
+        firstLog.assignTopicId(topicId);
+        firstLog.close();
+
+        // test creating a log with a new ID
+        assertThrows(InconsistentTopicIdException.class, () ->
+                createLog(logDir, logConfig, 0L, 0L, brokerTopicStats, 
mockTime.scheduler, mockTime,
+                        producerStateManagerConfig, false, 
Optional.of(Uuid.randomUuid()), false));
+    }
+
+    /**
+     * Test building the time index on the follower by setting assignOffsets 
to false.
+     */
+    @Test
+    public void testBuildTimeIndexWhenNotAssigningOffsets() throws IOException 
{
+        var numMessages = 100;
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(10000)
+                .indexIntervalBytes(1)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        for (int i = 0; i < numMessages; i++) {
+            log.appendAsFollower(
+                    MemoryRecords.withRecords(100 + i, Compression.NONE, 0,
+                            new SimpleRecord(mockTime.milliseconds() + i, 
String.valueOf(i).getBytes())),
+                    Integer.MAX_VALUE);
+        }
+
+        var timeIndexEntries = log.logSegments().stream()
+                .mapToInt(segment -> {
+                    try {
+                        return segment.timeIndex().entries();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }).sum();
+        assertEquals(numMessages - 1, timeIndexEntries,
+                "There should be " + (numMessages - 1) + " time index 
entries");
+        assertEquals(mockTime.milliseconds() + numMessages - 1,
+                log.activeSegment().timeIndex().lastEntry().timestamp(),
+                "The last time index entry should have timestamp " + 
(mockTime.milliseconds() + numMessages - 1));
+    }
+
+    @Test
+    public void testFetchOffsetByTimestampIncludesLeaderEpoch() throws 
IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        assertEquals(new OffsetResultHolder(Optional.empty()),
+                log.fetchOffsetByTimestamp(0L, Optional.empty()));
+
+        var firstTimestamp = mockTime.milliseconds();
+        var firstLeaderEpoch = 0;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
firstTimestamp), firstLeaderEpoch);
+
+        var secondTimestamp = firstTimestamp + 1;
+        var secondLeaderEpoch = 1;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
secondTimestamp), secondLeaderEpoch);
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))),
+                log.fetchOffsetByTimestamp(firstTimestamp, Optional.empty()));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(secondTimestamp, 1L, 
Optional.of(secondLeaderEpoch))),
+                log.fetchOffsetByTimestamp(secondTimestamp, Optional.empty()));
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, 
Optional.empty()));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, 
Optional.empty()));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(secondLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.empty()));
+
+        // The cache can be updated directly after a leader change.
+        // The new latest offset should reflect the updated epoch.
+        log.assignEpochStartOffset(2, 2L);
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(2))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.empty()));
+    }
+
+    @Test
+    public void testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp() 
throws IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        assertEquals(new OffsetResultHolder(Optional.empty()),
+                log.fetchOffsetByTimestamp(0L, Optional.empty()));
+
+        var firstTimestamp = mockTime.milliseconds();
+        var leaderEpoch = 0;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
firstTimestamp), leaderEpoch);
+
+        var secondTimestamp = firstTimestamp + 1;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
secondTimestamp), leaderEpoch);
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
firstTimestamp), leaderEpoch);
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))),
+                log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP, 
Optional.empty()));
+    }
+
+    @Test
+    public void testFetchOffsetByTimestampFromRemoteStorage() throws Exception 
{
+        var config = createKafkaConfigWithRLM();
+        var purgatory = new 
DelayedOperationPurgatory<DelayedRemoteListOffsets>("RemoteListOffsets", 
config.brokerId());
+        var remoteLogManager = spy(new 
RemoteLogManager(config.remoteLogManagerConfig(),
+                0,
+                logDir.getAbsolutePath(),
+                "clusterId",
+                mockTime,
+                tp -> Optional.empty(),
+                (tp, offset) -> { },
+                brokerTopicStats,
+                new Metrics(),
+                Optional.empty()));
+        remoteLogManager.setDelayedOperationPurgatory(purgatory);
+
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .remoteLogStorageEnable(true)
+                .build();
+        log = createLog(logDir, logConfig, true);
+
+        // Note that the log is empty, so remote offset read won't happen
+        assertEquals(new OffsetResultHolder(Optional.empty()),
+                log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager)));
+
+        var firstTimestamp = mockTime.milliseconds();
+        var firstLeaderEpoch = 0;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
firstTimestamp), firstLeaderEpoch);
+
+        var secondTimestamp = firstTimestamp + 1;
+        var secondLeaderEpoch = 1;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
secondTimestamp), secondLeaderEpoch);
+
+        doAnswer(ans -> {
+            long timestamp = ans.getArgument(1);
+            return Optional.of(timestamp)
+                    .filter(t -> t == firstTimestamp)
+                    .map(t -> new FileRecords.TimestampAndOffset(t, 0L, 
Optional.of(firstLeaderEpoch)));
+        }).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()), anyLong(), anyLong(), 
eq(log.leaderEpochCache()));
+        log.updateLocalLogStartOffset(1);
+
+        // In the assertions below we test that offset 0 (first timestamp) is 
in remote and offset 1 (second timestamp) is in local storage.
+        assertFetchOffsetByTimestamp(remoteLogManager, new 
FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch)), firstTimestamp, log);
+        assertFetchOffsetByTimestamp(remoteLogManager, new 
FileRecords.TimestampAndOffset(secondTimestamp, 1L, 
Optional.of(secondLeaderEpoch)), secondTimestamp, log);
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, 
Optional.of(remoteLogManager)));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(secondLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
+
+        log.assignEpochStartOffset(2, 2L);
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(2))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
+    }
+
+    @Test
+    public void testFetchLatestTieredTimestampNoRemoteStorage() throws 
IOException {
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, 
Optional.of(-1))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, 
Optional.empty()));
+
+        var firstTimestamp = mockTime.milliseconds();
+        var leaderEpoch = 0;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
firstTimestamp), leaderEpoch);
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
firstTimestamp + 1), leaderEpoch);
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, 
Optional.of(-1))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, 
Optional.empty()));
+    }
+
+    @Test
+    public void testFetchLatestTieredTimestampWithRemoteStorage() throws 
Exception {
+        var config = createKafkaConfigWithRLM();
+        var purgatory = new 
DelayedOperationPurgatory<DelayedRemoteListOffsets>("RemoteListOffsets", 
config.brokerId());
+        var remoteLogManager = spy(new 
RemoteLogManager(config.remoteLogManagerConfig(),
+                0,
+                logDir.getAbsolutePath(),
+                "clusterId",
+                mockTime,
+                tp -> Optional.empty(),
+                (tp, offset) -> { },
+                brokerTopicStats,
+                new Metrics(),
+                Optional.empty()));
+        remoteLogManager.setDelayedOperationPurgatory(purgatory);
+
+        var logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .remoteLogStorageEnable(true)
+                .build();
+        log = createLog(logDir, logConfig, true);
+
+        // Note that the log is empty, so remote offset read won't happen
+        assertEquals(new OffsetResultHolder(Optional.empty()),
+                log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager)));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, 
Optional.empty())),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, 
Optional.of(remoteLogManager)));
+
+        var firstTimestamp = mockTime.milliseconds();
+        var firstLeaderEpoch = 0;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
firstTimestamp), firstLeaderEpoch);
+
+        var secondTimestamp = firstTimestamp + 1;
+        var secondLeaderEpoch = 1;
+        log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10), 
secondTimestamp), secondLeaderEpoch);
+
+        doAnswer(ans -> {
+            long timestamp = ans.getArgument(1);
+            return Optional.of(timestamp)
+                    .filter(t -> t == firstTimestamp)
+                    .map(t -> new FileRecords.TimestampAndOffset(t, 0L, 
Optional.of(firstLeaderEpoch)));
+        }).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()), anyLong(), anyLong(), 
eq(log.leaderEpochCache()));
+        log.updateLocalLogStartOffset(1);
+        log.updateHighestOffsetInRemoteStorage(0);
+
+        // In the assertions below we test that offset 0 (first timestamp) is 
in remote and offset 1 (second timestamp) is in local storage.
+        assertFetchOffsetByTimestamp(remoteLogManager, new 
FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch)), firstTimestamp, log);
+        assertFetchOffsetByTimestamp(remoteLogManager, new 
FileRecords.TimestampAndOffset(secondTimestamp, 1L, 
Optional.of(secondLeaderEpoch)), secondTimestamp, log);
+
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, 
Optional.of(remoteLogManager)));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, 
Optional.of(remoteLogManager)));
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(secondLeaderEpoch))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
+
+        log.assignEpochStartOffset(2, 2L);
+        assertEquals(new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(2))),
+                
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
+    }
+
+    private void assertFetchOffsetByTimestamp(RemoteLogManager 
remoteLogManager,
+                                               FileRecords.TimestampAndOffset 
expected,
+                                               long timestamp,
+                                               UnifiedLog testLog) throws 
Exception {
+        OffsetResultHolder offsetResultHolder = 
testLog.fetchOffsetByTimestamp(timestamp, Optional.of(remoteLogManager));
+        assertTrue(offsetResultHolder.futureHolderOpt().isPresent());
+        offsetResultHolder.futureHolderOpt().get().taskFuture().get(1, 
TimeUnit.SECONDS);
+        
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().isDone());
+        
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().get().hasTimestampAndOffset());
+        assertEquals(expected, 
offsetResultHolder.futureHolderOpt().get().taskFuture().get().timestampAndOffset().orElse(null));
+    }
+
+    private KafkaConfig createKafkaConfigWithRLM() {

Review Comment:
   Could we get rid this method? It seems all the tests only require a broker 
id and an instance of `RemoteLogManagerConfig`
   
   My thinking is that this would remove the dependency on `KafkaConfig`



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -41,20 +41,16 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffs
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
RecordValidationException, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.internals.utils.Throttler
-import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, 
BrokerTopicStats}
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ArgumentsSource
 import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyLong}
 import org.mockito.Mockito.{doAnswer, doThrow, spy}
-import net.jqwik.api.AfterFailureMode
-import net.jqwik.api.ForAll
-import net.jqwik.api.Property

Review Comment:
   I don't think we use `net.jqwik` anywhere else in `core`. Can we remove 
`testImplementation libs.jqwik` from `build.gradle`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to