This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a950f45ea5 KAFKA-19752 Move UnifiedLogTest to storage module (1/N) 
(#20635)
3a950f45ea5 is described below

commit 3a950f45ea5ecfb81e4aa7de84e388ea6c68b38a
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Dec 24 18:26:51 2025 +0800

    KAFKA-19752 Move UnifiedLogTest to storage module (1/N) (#20635)
    
    This PR migrates several unit tests from UnifiedLogTest.scala to
    UnifiedLogTest.java as part of the ongoing effort to convert core broker
    logic and tests to Java.
    
    The following test methods have been rewritten and moved:
    - testDeleteOldSegments
    - testLogDeletionAfterClose
    - testLogDeletionAfterDeleteRecords
    - shouldDeleteSizeBasedSegments
    - shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize
    - shouldDeleteTimeBasedSegmentsReadyToBeDeleted
    - shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted
    - shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete
    - shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete
    - shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention
    - shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention
    - shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete
    - shouldApplyEpochToMessageOnAppendIfLeader
    - followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache
    - shouldTruncateLeaderEpochsWhenDeletingSegments
    - shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments
    - shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLogt
    - testFirstUnstableOffsetNoTransactionalData
    - testFirstUnstableOffsetWithTransactionalData
    
    New Utility Methods: To support the Java tests, new utility methods,
    including LogConfigBuilder and transactional append helpers, were added
    to LogTestUtils.java.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 519 +--------------
 .../kafka/storage/internals/log/LogTestUtils.java  | 122 ++++
 .../storage/internals/log/UnifiedLogTest.java      | 713 +++++++++++++++++++++
 .../org/apache/kafka/common/test/TestUtils.java    |   1 +
 4 files changed, 862 insertions(+), 493 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index d38e65e29f1..eb54319eb94 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -52,7 +52,6 @@ import org.junit.jupiter.params.provider.{EnumSource, 
ValueSource}
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyLong}
 import org.mockito.Mockito.{doAnswer, doThrow, spy}
-
 import net.jqwik.api.AfterFailureMode
 import net.jqwik.api.ForAll
 import net.jqwik.api.Property
@@ -1016,7 +1015,7 @@ class UnifiedLogTest {
     assertEquals(numProducerSnapshots, 
ProducerStateManager.listSnapshotFiles(logDir).size)
     // Sleep to breach the retention period
     mockTime.sleep(1000 * 60 + 1)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
     mockTime.sleep(1)
     assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
@@ -1065,7 +1064,7 @@ class UnifiedLogTest {
 
     // Increment the log start offset to exclude the first two segments.
     log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
     mockTime.sleep(1)
     assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
@@ -1103,7 +1102,8 @@ class UnifiedLogTest {
     // Clean segments, this should delete everything except the active segment 
since there only
     // exists the key "a".
     cleaner.clean(new LogToClean(log, 0, log.logEndOffset, false))
-    log.deleteOldSegments()
+    // There is no other key so we don't delete anything
+    assertEquals(0, log.deleteOldSegments())
     // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
     mockTime.sleep(1)
     
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
@@ -1166,7 +1166,7 @@ class UnifiedLogTest {
     assertEquals(util.Set.of(pid1, pid2), 
log.activeProducersWithLastSequence.keySet)
 
     log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
 
     // Producer state should not be removed when deleting log segment
     assertEquals(2, log.logSegments.size)
@@ -1547,7 +1547,7 @@ class UnifiedLogTest {
 
     log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(2L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    log.deleteOldSegments() // force retention to kick in so that the snapshot 
files are cleaned up.
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")// force retention to kick in so that the snapshot files are cleaned 
up.
     mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so 
file deletion takes place
 
     // Deleting records should not remove producer state but should delete 
snapshots after the file deletion delay.
@@ -2723,7 +2723,7 @@ class UnifiedLogTest {
     val oldFiles = segments.map(_.log.file) ++ segments.map(_.offsetIndexFile)
 
     log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
 
     assertEquals(1, log.numberOfSegments, "Only one segment should remain.")
     
assertTrue(segments.forall(_.log.file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 &&
@@ -3015,476 +3015,8 @@ class UnifiedLogTest {
     assertFalse(LogTestUtils.hasOffsetOverflow(log))
   }
 
-  @Test
-  def testDeleteOldSegments(): Unit = {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 100)
-      log.appendAsLeader(createRecords, 0)
-
-    log.assignEpochStartOffset(0, 40)
-    log.assignEpochStartOffset(1, 90)
-
-    // segments are not eligible for deletion if no high watermark has been set
-    val numSegments = log.numberOfSegments
-    log.deleteOldSegments()
-    assertEquals(numSegments, log.numberOfSegments)
-    assertEquals(0L, log.logStartOffset)
-
-    // only segments with offset before the current high watermark are 
eligible for deletion
-    for (hw <- 25 to 30) {
-      log.updateHighWatermark(hw)
-      log.deleteOldSegments()
-      assertTrue(log.logStartOffset <= hw)
-      log.logSegments.forEach { segment =>
-        val segmentFetchInfo = segment.read(segment.baseOffset, Int.MaxValue)
-        val segmentLastOffsetOpt = 
segmentFetchInfo.records.records.asScala.lastOption.map(_.offset)
-        segmentLastOffsetOpt.foreach { lastOffset =>
-          assertTrue(lastOffset >= hw)
-        }
-      }
-    }
-
-    // expire all segments
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(1, log.numberOfSegments, "The deleted segments should be 
gone.")
-    assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should 
have gone.")
-    assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries.get(0), 
"Epoch entry should be the latest epoch and the leo.")
-
-    // append some messages to create some segments
-    for (_ <- 0 until 100)
-      log.appendAsLeader(createRecords, 0)
-
-    log.delete()
-    assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
-    assertEquals(0, log.deleteOldSegments(), "The number of deleted segments 
should be zero.")
-    assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should 
have gone.")
-  }
-
-  @Test
-  def testLogDeletionAfterClose(): Unit = {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    log.appendAsLeader(createRecords, 0)
-
-    assertEquals(1, log.numberOfSegments, "The deleted segments should be 
gone.")
-    assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should 
have gone.")
-
-    log.close()
-    log.delete()
-    assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
-    assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should 
have gone.")
-  }
-
-  @Test
-  def testLogDeletionAfterDeleteRecords(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5)
-    val log = createLog(logDir, logConfig)
-
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-    assertEquals(3, log.numberOfSegments, "should have 3 segments")
-    assertEquals(log.logStartOffset, 0)
-    log.updateHighWatermark(log.logEndOffset)
-
-    log.maybeIncrementLogStartOffset(1, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    log.deleteOldSegments()
-    assertEquals(3, log.numberOfSegments, "should have 3 segments")
-    assertEquals(log.logStartOffset, 1)
-
-    log.maybeIncrementLogStartOffset(6, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    log.deleteOldSegments()
-    assertEquals(2, log.numberOfSegments, "should have 2 segments")
-    assertEquals(log.logStartOffset, 6)
-
-    log.maybeIncrementLogStartOffset(15, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    log.deleteOldSegments()
-    assertEquals(1, log.numberOfSegments, "should have 1 segments")
-    assertEquals(log.logStartOffset, 15)
-  }
-
   def epochCache(log: UnifiedLog): LeaderEpochFileCache = log.leaderEpochCache
 
-  @Test
-  def shouldDeleteSizeBasedSegments(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(2,log.numberOfSegments, "should have 2 segments")
-  }
-
-  @Test
-  def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(3,log.numberOfSegments, "should have 3 segments")
-  }
-
-  @Test
-  def shouldDeleteTimeBasedSegmentsReadyToBeDeleted(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp 
= 10)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000)
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(1, log.numberOfSegments, "There should be 1 segment 
remaining")
-  }
-
-  @Test
-  def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp 
= mockTime.milliseconds)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000000)
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(3, log.numberOfSegments, "There should be 3 segments 
remaining")
-  }
-
-  @Test
-  def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes(), timestamp = 10L)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-
-    // mark the oldest segment as older the retention.ms
-    log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000)
-
-    val segments = log.numberOfSegments
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(segments, log.numberOfSegments, "There should be 3 segments 
remaining")
-  }
-
-  @Test
-  def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit = 
{
-    def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes(), timestamp = 10L)
-    val recordSize = createRecords.sizeInBytes
-    val logConfig = LogTestUtils.createLogConfig(
-      segmentBytes = recordSize * 2,
-      localRetentionBytes = recordSize / 2,
-      cleanupPolicy = "",
-      remoteLogStorageEnable = true
-    )
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
-    for (_ <- 0 until 10)
-      log.appendAsLeader(createRecords, 0)
-
-    val segmentsBefore = log.numberOfSegments
-    log.updateHighWatermark(log.logEndOffset)
-    log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
-    val deleteOldSegments = log.deleteOldSegments()
-
-    assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be 
deleted due to size retention")
-    assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
-  }
-
-  @Test
-  def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = {
-    val oldTimestamp = mockTime.milliseconds - 20000
-    def oldRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes(), timestamp = oldTimestamp)
-    val recordSize = oldRecords.sizeInBytes
-    val logConfig = LogTestUtils.createLogConfig(
-      segmentBytes = recordSize * 2,
-      localRetentionMs = 5000,
-      cleanupPolicy = "",
-      remoteLogStorageEnable = true
-    )
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
-    for (_ <- 0 until 10)
-      log.appendAsLeader(oldRecords, 0)
-
-    def newRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes(), timestamp = mockTime.milliseconds)
-    for (_ <- 0 until 5)
-      log.appendAsLeader(newRecords, 0)
-
-    val segmentsBefore = log.numberOfSegments
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
-    val deleteOldSegments = log.deleteOldSegments()
-
-    assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be 
deleted due to time retention")
-    assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
-  }
-
-  @Test
-  def 
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete(): Unit 
= {
-    def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes, timestamp = 10L)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = 
"compact,delete")
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(1, log.numberOfSegments, "There should be 1 segment 
remaining")
-  }
-
-  @Test
-  def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): 
Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes, timestamp = 10L)
-    val recordsPerSegment = 5
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000, 
cleanupPolicy = "compact")
-    val log = createLog(logDir, logConfig, brokerTopicStats)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 15)
-      log.appendAsLeader(createRecords, 0)
-
-    // Three segments should be created
-    assertEquals(3, log.logSegments.asScala.count(_ => true))
-    log.updateHighWatermark(log.logEndOffset)
-    log.maybeIncrementLogStartOffset(recordsPerSegment, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
-    // The first segment, which is entirely before the log start offset, 
should be deleted
-    // Of the remaining the segments, the first can overlap the log start 
offset and the rest must have a base offset
-    // greater than the start offset
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-    assertEquals(2, log.numberOfSegments, "There should be 2 segments 
remaining")
-    assertTrue(log.logSegments.asScala.head.baseOffset <= log.logStartOffset)
-    assertTrue(log.logSegments.asScala.tail.forall(s => s.baseOffset > 
log.logStartOffset))
-  }
-
-  @Test
-  def shouldApplyEpochToMessageOnAppendIfLeader(): Unit = {
-    val records = (0 until 50).toArray.map(id => new 
SimpleRecord(id.toString.getBytes))
-
-    //Given this partition is on leader epoch 72
-    val epoch = 72
-    val log = createLog(logDir, new LogConfig(new Properties))
-    log.assignEpochStartOffset(epoch, records.length)
-
-    //When appending messages as a leader (i.e. assignOffsets = true)
-    for (record <- records)
-      log.appendAsLeader(
-        MemoryRecords.withRecords(Compression.NONE, record),
-        epoch
-      )
-
-    //Then leader epoch should be set on messages
-    for (i <- records.indices) {
-      val read = LogTestUtils.readLog(log, i, 
1).records.batches.iterator.next()
-      assertEquals(72, read.partitionLeaderEpoch, "Should have set leader 
epoch")
-    }
-  }
-
-  @Test
-  def 
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache(): Unit 
= {
-    val messageIds = (0 until 50).toArray
-    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
-
-    //Given each message has an offset & epoch, as msgs from leader would
-    def recordsForEpoch(i: Int): MemoryRecords = {
-      val recs = MemoryRecords.withRecords(messageIds(i), Compression.NONE, 
records(i))
-      recs.batches.forEach{record =>
-        record.setPartitionLeaderEpoch(42)
-        record.setLastOffset(i)
-      }
-      recs
-    }
-
-    val log = createLog(logDir, new LogConfig(new Properties))
-
-    //When appending as follower (assignOffsets = false)
-    for (i <- records.indices)
-      log.appendAsFollower(recordsForEpoch(i), i)
-
-    assertEquals(Optional.of(42), log.latestEpoch)
-  }
-
-  @Test
-  def shouldTruncateLeaderEpochsWhenDeletingSegments(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
-    val log = createLog(logDir, logConfig)
-    val cache = epochCache(log)
-
-    // Given three segments of 5 messages each
-    for (_ <- 0 until 15) {
-      log.appendAsLeader(createRecords, 0)
-    }
-
-    //Given epochs
-    cache.assign(0, 0)
-    cache.assign(1, 5)
-    cache.assign(2, 10)
-
-    //When first segment is removed
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-
-    //The oldest epoch entry should have been removed
-    assertEquals(util.List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), 
cache.epochEntries)
-  }
-
-  @Test
-  def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments(): Unit = {
-    def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
-    val log = createLog(logDir, logConfig)
-    val cache = epochCache(log)
-
-    // Given three segments of 5 messages each
-    for (_ <- 0 until 15) {
-      log.appendAsLeader(createRecords, 0)
-    }
-
-    //Given epochs
-    cache.assign(0, 0)
-    cache.assign(1, 7)
-    cache.assign(2, 10)
-
-    //When first segment removed (up to offset 5)
-    log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
-
-    //The first entry should have gone from (0,0) => (0,5)
-    assertEquals(util.List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new 
EpochEntry(2, 10)), cache.epochEntries)
-  }
-
-  @Test
-  def shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog(): Unit = {
-    def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
-      TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
-        baseOffset = startOffset, partitionLeaderEpoch = epoch)
-    }
-
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10 * 
createRecords(0, 0).sizeInBytes)
-    val log = createLog(logDir, logConfig)
-    val cache = epochCache(log)
-
-    def append(epoch: Int, startOffset: Long, count: Int): Unit = {
-      for (i <- 0 until count)
-        log.appendAsFollower(createRecords(startOffset + i, epoch), epoch)
-    }
-
-    //Given 2 segments, 10 messages per segment
-    append(epoch = 0, startOffset = 0, count = 10)
-    append(epoch = 1, startOffset = 10, count = 6)
-    append(epoch = 2, startOffset = 16, count = 4)
-
-    assertEquals(2, log.numberOfSegments)
-    assertEquals(20, log.logEndOffset)
-
-    //When truncate to LEO (no op)
-    log.truncateTo(log.logEndOffset)
-
-    //Then no change
-    assertEquals(3, cache.epochEntries.size)
-
-    //When truncate
-    log.truncateTo(11)
-
-    //Then no change
-    assertEquals(2, cache.epochEntries.size)
-
-    //When truncate
-    log.truncateTo(10)
-
-    //Then
-    assertEquals(1, cache.epochEntries.size)
-
-    //When truncate all
-    log.truncateTo(0)
-
-    //Then
-    assertEquals(0, cache.epochEntries.size)
-  }
-
-  @Test
-  def testFirstUnstableOffsetNoTransactionalData(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    val records = MemoryRecords.withRecords(Compression.NONE,
-      new SimpleRecord("foo".getBytes),
-      new SimpleRecord("bar".getBytes),
-      new SimpleRecord("baz".getBytes))
-
-    log.appendAsLeader(records, 0)
-    assertEquals(Optional.empty, log.firstUnstableOffset)
-  }
-
-  @Test
-  def testFirstUnstableOffsetWithTransactionalData(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    val pid = 137L
-    val epoch = 5.toShort
-    var seq = 0
-
-    // add some transactional records
-    val records = MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid, epoch, seq,
-      new SimpleRecord("foo".getBytes),
-      new SimpleRecord("bar".getBytes),
-      new SimpleRecord("baz".getBytes))
-
-    val firstAppendInfo = log.appendAsLeader(records, 0)
-    assertEquals(Optional.of(firstAppendInfo.firstOffset), 
log.firstUnstableOffset)
-
-    // add more transactional records
-    seq += 3
-    
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid, epoch, seq,
-      new SimpleRecord("blah".getBytes)), 0)
-
-    // LSO should not have changed
-    assertEquals(Optional.of(firstAppendInfo.firstOffset), 
log.firstUnstableOffset)
-
-    // now transaction is committed
-    val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, 
epoch, ControlRecordType.COMMIT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-
-    // first unstable offset is not updated until the high watermark is 
advanced
-    assertEquals(Optional.of(firstAppendInfo.firstOffset), 
log.firstUnstableOffset)
-    log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
-
-    // now there should be no first unstable offset
-    assertEquals(Optional.empty, log.firstUnstableOffset)
-  }
-
   @Test
   def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
@@ -3535,6 +3067,7 @@ class UnifiedLogTest {
     }
   }
 
+
   @Test
   def testTransactionIndexUpdated(): Unit = {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
@@ -3960,7 +3493,7 @@ class UnifiedLogTest {
     log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(8L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
     log.updateHighWatermark(log.logEndOffset)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(1, log.logSegments.size)
 
     // the first unstable offset should be lower bounded by the log start 
offset
@@ -4160,7 +3693,7 @@ class UnifiedLogTest {
     assertEquals(25L, initialHighWatermark)
 
     val initialNumSegments = log.numberOfSegments
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertTrue(log.numberOfSegments < initialNumSegments)
     assertTrue(log.logStartOffset <= initialHighWatermark)
   }
@@ -4870,7 +4403,7 @@ class UnifiedLogTest {
 
     mockTime.sleep(2)
     // It should have rolled the active segment as they are eligible for 
deletion
-    log.deleteOldSegments()
+    assertEquals(0, log.deleteOldSegments())
     assertEquals(2, log.logSegments.size)
     log.logSegments.asScala.zipWithIndex.foreach {
       case (segment, idx) => assertEquals(idx, segment.baseOffset)
@@ -4878,7 +4411,7 @@ class UnifiedLogTest {
 
     // Once rolled, the segment should be uploaded to remote storage and 
eligible for deletion
     log.updateHighestOffsetInRemoteStorage(1)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(1, log.logSegments.size)
     assertEquals(1, log.logSegments.asScala.head.baseOffset())
     assertEquals(1, log.localLogStartOffset())
@@ -4910,14 +4443,14 @@ class UnifiedLogTest {
 
     // No segments are uploaded to remote storage, none of the local log 
segments should be eligible for deletion
     log.updateHighestOffsetInRemoteStorage(-1L)
-    log.deleteOldSegments()
+    assertEquals(0, log.deleteOldSegments())
     mockTime.sleep(1)
     assertEquals(2, log.logSegments.size)
     assertFalse(log.isEmpty)
 
     // Update the log-start-offset from 0 to 3, then the base segment should 
not be eligible for deletion
     log.updateLogStartOffsetFromRemoteTier(3L)
-    log.deleteOldSegments()
+    assertEquals(0, log.deleteOldSegments())
     mockTime.sleep(1)
     assertEquals(2, log.logSegments.size)
     assertFalse(log.isEmpty)
@@ -4925,13 +4458,13 @@ class UnifiedLogTest {
     // Update the log-start-offset from 3 to 4, then the base segment should 
be eligible for deletion now even
     // if it is not uploaded to remote storage
     log.updateLogStartOffsetFromRemoteTier(4L)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     mockTime.sleep(1)
     assertEquals(1, log.logSegments.size)
     assertFalse(log.isEmpty)
 
     log.updateLogStartOffsetFromRemoteTier(5L)
-    log.deleteOldSegments()
+    assertEquals(0, log.deleteOldSegments())
     mockTime.sleep(1)
     assertEquals(1, log.logSegments.size)
     assertTrue(log.isEmpty)
@@ -4954,7 +4487,7 @@ class UnifiedLogTest {
     log.updateHighWatermark(log.logEndOffset)
     // simulate calls to upload 2 segments to remote storage
     log.updateHighestOffsetInRemoteStorage(1)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(4, log.logSegments.size())
     assertEquals(0, log.logStartOffset)
     assertEquals(2, log.localLogStartOffset())
@@ -4979,7 +4512,7 @@ class UnifiedLogTest {
     log.updateHighestOffsetInRemoteStorage(1)
 
     mockTime.sleep(1001)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(4, log.logSegments.size())
     assertEquals(0, log.logStartOffset)
     assertEquals(2, log.localLogStartOffset())
@@ -5002,7 +4535,7 @@ class UnifiedLogTest {
     log.updateHighWatermark(log.logEndOffset)
 
     // Should not delete local log because highest remote storage offset is -1 
(default value)
-    log.deleteOldSegments()
+    assertEquals(0, log.deleteOldSegments())
     assertEquals(6, log.logSegments.size())
     assertEquals(0, log.logStartOffset)
     assertEquals(0, log.localLogStartOffset())
@@ -5010,7 +4543,7 @@ class UnifiedLogTest {
     // simulate calls to upload 2 segments to remote storage
     log.updateHighestOffsetInRemoteStorage(1)
 
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(4, log.logSegments.size())
     assertEquals(0, log.logStartOffset)
     assertEquals(2, log.localLogStartOffset())
@@ -5022,7 +4555,7 @@ class UnifiedLogTest {
 
     // No local logs will be deleted even though local retention bytes is 1 
because we'll adopt retention.ms/bytes
     // when remote.log.copy.disable = true
-    log.deleteOldSegments()
+    assertEquals(0, log.deleteOldSegments())
     assertEquals(4, log.logSegments.size())
     assertEquals(0, log.logStartOffset)
     assertEquals(2, log.localLogStartOffset())
@@ -5042,7 +4575,7 @@ class UnifiedLogTest {
 
     // try to delete local logs again, 2 segments will be deleted this time 
because we'll adopt retention.ms/bytes (retention.bytes = 5)
     // when remote.log.copy.disable = true
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(5, log.logSegments.size())
     assertEquals(4, log.logStartOffset)
     assertEquals(4, log.localLogStartOffset())
@@ -5054,14 +4587,14 @@ class UnifiedLogTest {
 
     // Should not delete any logs because no local logs expired using 
retention.ms = 1000
     mockTime.sleep(10)
-    log.deleteOldSegments()
+    assertEquals(0, log.deleteOldSegments())
     assertEquals(5, log.logSegments.size())
     assertEquals(4, log.logStartOffset)
     assertEquals(4, log.localLogStartOffset())
 
     // Should delete all logs because all of them are expired based on 
retentionMs = 1000
     mockTime.sleep(1000)
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(1, log.logSegments.size())
     assertEquals(9, log.logStartOffset)
     assertEquals(9, log.localLogStartOffset())
@@ -5085,7 +4618,7 @@ class UnifiedLogTest {
     // simulate calls to upload 3 segments to remote storage
     log.updateHighestOffsetInRemoteStorage(30)
 
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(2, log.logSegments.size())
     assertEquals(0, log.logStartOffset)
     assertEquals(31, log.localLogStartOffset())
@@ -5109,7 +4642,7 @@ class UnifiedLogTest {
     // simulate calls to upload 3 segments to remote storage
     log.updateHighestOffsetInRemoteStorage(30)
 
-    log.deleteOldSegments()
+    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
     assertEquals(2, log.logSegments.size())
     assertEquals(0, log.logStartOffset)
     assertEquals(31, log.localLogStartOffset())
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 50b666a0fb1..6a89b82043d 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.RequestLocal;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class LogTestUtils {
     public static LogSegment createSegment(long offset, File logDir, int 
indexIntervalBytes, Time time) throws IOException {
@@ -33,4 +40,119 @@ public class LogTestUtils {
         // Create and return the LogSegment instance
         return new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, time);
     }
+
+
+    /**
+     * Append an end transaction marker (commit or abort) to the log as a 
leader.
+     *
+     * @param transactionVersion the transaction version (1 = TV1, 2 = TV2) 
etc. Must be explicitly specified.
+     *                          TV2 markers require strict epoch validation 
(markerEpoch > currentEpoch),
+     *                          while legacy markers use relaxed validation 
(markerEpoch >= currentEpoch).
+     */
+    public static LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log,
+                                                           long producerId,
+                                                           short producerEpoch,
+                                                           ControlRecordType 
controlType,
+                                                           long timestamp,
+                                                           int 
coordinatorEpoch,
+                                                           int leaderEpoch,
+                                                           short 
transactionVersion) {
+        MemoryRecords records = endTxnRecords(controlType, producerId, 
producerEpoch, 0L, coordinatorEpoch, leaderEpoch, timestamp);
+
+        return log.appendAsLeader(records, leaderEpoch, 
AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
transactionVersion);
+    }
+
+    public static MemoryRecords endTxnRecords(ControlRecordType 
controlRecordType,
+                                              long producerId,
+                                              short epoch,
+                                              long offset,
+                                              int coordinatorEpoch,
+                                              int partitionLeaderEpoch,
+                                              long timestamp) {
+        EndTransactionMarker marker = new 
EndTransactionMarker(controlRecordType, coordinatorEpoch);
+        return MemoryRecords.withEndTransactionMarker(offset, timestamp, 
partitionLeaderEpoch, producerId, epoch, marker);
+    }
+
+    public static class LogConfigBuilder {
+        private final Map<String, Object> configs = new HashMap<>();
+
+        public LogConfigBuilder segmentMs(long segmentMs) {
+            configs.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs);
+            return this;
+        }
+
+        public LogConfigBuilder segmentBytes(int segmentBytes) {
+            configs.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentBytes);
+            return this;
+        }
+
+        public LogConfigBuilder retentionMs(long retentionMs) {
+            configs.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs);
+            return this;
+        }
+
+        public LogConfigBuilder localRetentionMs(long localRetentionMs) {
+            configs.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, 
localRetentionMs);
+            return this;
+        }
+
+        public LogConfigBuilder retentionBytes(long retentionBytes) {
+            configs.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
+            return this;
+        }
+
+        public LogConfigBuilder localRetentionBytes(long localRetentionBytes) {
+            configs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes);
+            return this;
+        }
+
+        public LogConfigBuilder segmentJitterMs(long segmentJitterMs) {
+            configs.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, segmentJitterMs);
+            return this;
+        }
+
+        public LogConfigBuilder cleanupPolicy(String cleanupPolicy) {
+            configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy);
+            return this;
+        }
+
+        public LogConfigBuilder maxMessageBytes(int maxMessageBytes) {
+            configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes);
+            return this;
+        }
+
+        public LogConfigBuilder indexIntervalBytes(int indexIntervalBytes) {
+            configs.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 
indexIntervalBytes);
+            return this;
+        }
+
+        public LogConfigBuilder segmentIndexBytes(int segmentIndexBytes) {
+            configs.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 
segmentIndexBytes);
+            return this;
+        }
+
+        public LogConfigBuilder fileDeleteDelayMs(long fileDeleteDelayMs) {
+            configs.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 
fileDeleteDelayMs);
+            return this;
+        }
+
+        public LogConfigBuilder remoteLogStorageEnable(boolean 
remoteLogStorageEnable) {
+            configs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
remoteLogStorageEnable);
+            return this;
+        }
+
+        public LogConfigBuilder remoteLogCopyDisable(boolean 
remoteLogCopyDisable) {
+            configs.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, 
remoteLogCopyDisable);
+            return this;
+        }
+
+        public LogConfigBuilder remoteLogDeleteOnDisable(boolean 
remoteLogDeleteOnDisable) {
+            configs.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
remoteLogDeleteOnDisable);
+            return this;
+        }
+
+        public LogConfig build() {
+            return new LogConfig(configs);
+        }
+    }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index ea14932ff20..0c7b4271d00 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -16,17 +16,63 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.common.TransactionVersion;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 import org.apache.kafka.test.TestUtils;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class UnifiedLogTest {
 
     private final File tmpDir = TestUtils.tempDirectory();
+    private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+    private final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(false);
+    private final MockTime mockTime = new MockTime();
+    private final int maxTransactionTimeoutMs = 60 * 60 * 1000;
+    private final ProducerStateManagerConfig producerStateManagerConfig = new 
ProducerStateManagerConfig(maxTransactionTimeoutMs, false);
+    private final List<UnifiedLog> logsToClose = new ArrayList<>();
+
+    private UnifiedLog log;
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        brokerTopicStats.close();
+        for (UnifiedLog log : logsToClose) {
+            log.close();
+        }
+        Utils.delete(tmpDir);
+    }
 
     @Test
     public void testOffsetFromProducerSnapshotFile() {
@@ -34,4 +80,671 @@ public class UnifiedLogTest {
         File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset);
         assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile));
     }
+
+    @Test
+    public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException 
{
+        SimpleRecord[] records = java.util.stream.IntStream.range(0, 50)
+            .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+            .toArray(SimpleRecord[]::new);
+
+        // Given this partition is on leader epoch 72
+        int epoch = 72;
+        try (UnifiedLog log = createLog(logDir, new LogConfig(new 
Properties()))) {
+            log.assignEpochStartOffset(epoch, records.length);
+
+            // When appending messages as a leader (i.e. assignOffsets = true)
+            for (SimpleRecord record : records) {
+                log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, 
record), epoch);
+            }
+
+            // Then leader epoch should be set on messages
+            for (int i = 0; i < records.length; i++) {
+                FetchDataInfo read = log.read(i, 1, FetchIsolation.LOG_END, 
true);
+                RecordBatch batch = read.records.batches().iterator().next();
+                assertEquals(epoch, batch.partitionLeaderEpoch(), "Should have 
set leader epoch");
+            }
+        }
+    }
+
+    @Test
+    public void 
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() 
throws IOException {
+        int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray();
+        SimpleRecord[] records = Arrays.stream(messageIds)
+            .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
+            .toArray(SimpleRecord[]::new);
+
+        //Given each message has an offset & epoch, as msgs from leader would
+        Function<Integer, MemoryRecords> recordsForEpoch = i -> {
+            MemoryRecords recs = MemoryRecords.withRecords(messageIds[i], 
Compression.NONE, records[i]);
+            recs.batches().forEach(record -> {
+                record.setPartitionLeaderEpoch(42);
+                record.setLastOffset(i);
+            });
+            return recs;
+        };
+
+        try (UnifiedLog log = createLog(logDir, new LogConfig(new 
Properties()))) {
+            // Given each message has an offset & epoch, as msgs from leader 
would
+            for (int i = 0; i < records.length; i++) {
+                log.appendAsFollower(recordsForEpoch.apply(i), i);
+            }
+
+            assertEquals(Optional.of(42), log.latestEpoch());
+        }
+    }
+
+    @Test
+    public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws 
IOException {
+        Supplier<MemoryRecords>  records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionBytes(records.get().sizeInBytes() * 10L)
+                .build();
+
+        log = createLog(logDir, config);
+        LeaderEpochFileCache cache = epochCache(log);
+
+        // Given three segments of 5 messages each
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // Given epochs
+        cache.assign(0, 0);
+        cache.assign(1, 5);
+        cache.assign(2, 10);
+
+        // When first segment is removed
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+
+        //The oldest epoch entry should have been removed
+        assertEquals(List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), 
cache.epochEntries());
+    }
+
+    @Test
+    public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionBytes(records.get().sizeInBytes() * 10L)
+                .build();
+
+        log = createLog(logDir, config);
+        LeaderEpochFileCache cache = epochCache(log);
+
+        // Given three segments of 5 messages each
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // Given epochs
+        cache.assign(0, 0);
+        cache.assign(1, 7);
+        cache.assign(2, 10);
+
+        // When first segment removed (up to offset 5)
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+
+        //The first entry should have gone from (0,0) => (0,5)
+        assertEquals(List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new 
EpochEntry(2, 10)), cache.epochEntries());
+    }
+
+    @Test
+    public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> records(List.of(new 
SimpleRecord("value".getBytes())), 0, 0);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(10 * records.get().sizeInBytes())
+                .build();
+        log = createLog(logDir, config);
+        LeaderEpochFileCache cache = epochCache(log);
+
+        //Given 2 segments, 10 messages per segment
+        append(0, 0, 10);
+        append(1, 10, 6);
+        append(2, 16, 4);
+
+        assertEquals(2, log.numberOfSegments());
+        assertEquals(20, log.logEndOffset());
+
+        // When truncate to LEO (no op)
+        log.truncateTo(log.logEndOffset());
+        // Then no change
+        assertEquals(3, cache.epochEntries().size());
+
+        // When truncate
+        log.truncateTo(11);
+        // Then no change
+        assertEquals(2, cache.epochEntries().size());
+
+        // When truncate
+        log.truncateTo(10);
+        assertEquals(1, cache.epochEntries().size());
+
+        // When truncate all
+        log.truncateTo(0);
+        assertEquals(0, cache.epochEntries().size());
+    }
+
+    @Test
+    public void shouldDeleteSizeBasedSegments() throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionBytes(records.get().sizeInBytes() * 10L)
+                .build();
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(2, log.numberOfSegments(), "should have 2 segments");
+    }
+
+    @Test
+    public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionBytes(records.get().sizeInBytes() * 15L)
+                .build();
+
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(3, log.numberOfSegments(), "should have 3 segments");
+    }
+
+    @Test
+    public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), 10L);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 15)
+                .retentionMs(10000L)
+                .build();
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(1, log.numberOfSegments(), "There should be 1 segment 
remaining");
+    }
+
+    @Test
+    public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), mockTime.milliseconds());
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionMs(10000000)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(3, log.numberOfSegments(), "There should be 3 segments 
remaining");
+    }
+
+    @Test
+    public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionMs(10000)
+                .cleanupPolicy("compact")
+                .build();
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // mark the oldest segment as older the retention.ms
+        
log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() - 
20000);
+
+        int segments = log.numberOfSegments();
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(segments, log.numberOfSegments(), "There should be 3 
segments remaining");
+    }
+
+    @Test
+    public void 
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionBytes(records.get().sizeInBytes() * 10L)
+                .cleanupPolicy("compact, delete")
+                .build();
+
+        log = createLog(logDir, config);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(1, log.numberOfSegments(), "There should be 1 segment 
remaining");
+    }
+
+    @Test
+    public void 
shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+        int recordSize = records.get().sizeInBytes();
+        LogConfig config = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(recordSize * 2)
+                .retentionBytes(recordSize / 2)
+                .cleanupPolicy("")
+                .remoteLogStorageEnable(true)
+                .build();
+        log = createLog(logDir, config, true);
+
+        for (int i = 0; i < 10; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        int segmentsBefore = log.numberOfSegments();
+        log.updateHighWatermark(log.logEndOffset());
+        log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1);
+        int deletedSegments = log.deleteOldSegments();
+
+        assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments 
should be deleted due to size retention");
+        assertTrue(deletedSegments > 0, "At least one segment should be 
deleted");
+    }
+
+    @Test
+    public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention() 
throws IOException {
+        long oldTimestamp = mockTime.milliseconds() - 20000;
+        Supplier<MemoryRecords> oldRecords = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), oldTimestamp);
+        int recordSize = oldRecords.get().sizeInBytes();
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(recordSize * 2)
+                .localRetentionMs(5000)
+                .cleanupPolicy("")
+                .remoteLogStorageEnable(true)
+                .build();
+        log = createLog(logDir, logConfig, true);
+
+        for (int i = 0; i < 10; i++) {
+            log.appendAsLeader(oldRecords.get(), 0);
+        }
+
+        Supplier<MemoryRecords> newRecords = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), mockTime.milliseconds());
+        for (int i = 0; i < 5; i++) {
+            log.appendAsLeader(newRecords.get(), 0);
+        }
+
+        int segmentsBefore = log.numberOfSegments();
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1);
+        int deletedSegments = log.deleteOldSegments();
+
+        assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments 
should be deleted due to time retention");
+        assertTrue(deletedSegments > 0, "At least one segment should be 
deleted");
+    }
+
+    @Test
+    public void testLogDeletionAfterDeleteRecords() throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+        assertEquals(3, log.numberOfSegments());
+        assertEquals(0, log.logStartOffset());
+        log.updateHighWatermark(log.logEndOffset());
+
+        // The logStartOffset at the first segment so we did not delete it.
+        log.maybeIncrementLogStartOffset(1, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(3, log.numberOfSegments());
+        assertEquals(1, log.logStartOffset());
+
+        log.maybeIncrementLogStartOffset(6, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(2, log.numberOfSegments());
+        assertEquals(6, log.logStartOffset());
+
+        log.maybeIncrementLogStartOffset(15, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(1, log.numberOfSegments());
+        assertEquals(15, log.logStartOffset());
+    }
+
+    @Test
+    public void testLogDeletionAfterClose() throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000);
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .segmentIndexBytes(1000)
+                .retentionMs(999)
+                .build();
+        log = createLog(logDir, logConfig);
+        // avoid close after test because it is closed in this test
+        logsToClose.remove(log);
+
+        // append some messages to create some segments
+        log.appendAsLeader(records.get(), 0);
+
+        assertEquals(1, log.numberOfSegments(), "The deleted segments should 
be gone.");
+        assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries 
should have gone.");
+
+        log.close();
+        log.delete();
+        assertEquals(0, log.numberOfSegments());
+        assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries 
should have gone.");
+    }
+
+    @Test
+    public void testDeleteOldSegments() throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000);
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .segmentIndexBytes(1000)
+                .retentionMs(999)
+                .build();
+        log = createLog(logDir, logConfig);
+        // avoid close after test because it is closed in this test
+        logsToClose.remove(log);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 100; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.assignEpochStartOffset(0, 40);
+        log.assignEpochStartOffset(1, 90);
+
+        // segments are not eligible for deletion if no high watermark has 
been set
+        int numSegments = log.numberOfSegments();
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(numSegments, log.numberOfSegments());
+        assertEquals(0L, log.logStartOffset());
+
+        // only segments with offset before the current high watermark are 
eligible for deletion
+        for (long hw = 25; hw <= 30; hw++) {
+            log.updateHighWatermark(hw);
+            log.deleteOldSegments();
+            assertTrue(log.logStartOffset() <= hw);
+            long finalHw = hw;
+            log.logSegments().forEach(segment -> {
+                FetchDataInfo segmentFetchInfo;
+                try {
+                    segmentFetchInfo = segment.read(segment.baseOffset(), 
Integer.MAX_VALUE);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                Optional<RecordBatch> lastBatch = Optional.empty();
+                for (RecordBatch batch : segmentFetchInfo.records.batches()) {
+                    lastBatch = Optional.of(batch);
+                }
+                lastBatch.ifPresent(batch -> assertTrue(batch.lastOffset() >= 
finalHw));
+            });
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(1, log.numberOfSegments(), "The deleted segments should 
be gone.");
+        assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries 
should have gone.");
+        assertEquals(new EpochEntry(1, 100), 
epochCache(log).epochEntries().get(0), "Epoch entry should be the latest epoch 
and the leo.");
+
+        for (int i = 0; i < 100; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        log.delete();
+        assertEquals(0, log.numberOfSegments(), "The number of segments should 
be 0");
+        assertEquals(0, log.deleteOldSegments(), "The number of deleted 
segments should be zero.");
+        assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries 
should have gone.");
+    }
+
+    @Test
+    public void 
shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes(), "test".getBytes(), 10L);
+        int recordsPerSegment = 5;
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * recordsPerSegment)
+                .segmentIndexBytes(1000)
+                .cleanupPolicy("compact")
+                .build();
+        log = createLog(logDir, logConfig);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        assertEquals(3, log.numberOfSegments());
+        log.updateHighWatermark(log.logEndOffset());
+        log.maybeIncrementLogStartOffset(recordsPerSegment, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+        // The first segment, which is entirely before the log start offset, 
should be deleted
+        // Of the remaining the segments, the first can overlap the log start 
offset and the rest must have a base offset
+        // greater than the start offset.
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(2, log.numberOfSegments(), "There should be 2 segments 
remaining");
+        assertTrue(log.logSegments().iterator().next().baseOffset() <= 
log.logStartOffset());
+        log.logSegments().forEach(segment -> {
+            if (log.logSegments().iterator().next() != segment) {
+                assertTrue(segment.baseOffset() > log.logStartOffset());
+            }
+        });
+    }
+
+    @Test
+    public void testFirstUnstableOffsetNoTransactionalData() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1024 * 1024 * 5)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
+            new SimpleRecord("foo".getBytes()),
+            new SimpleRecord("bar".getBytes()),
+            new SimpleRecord("baz".getBytes()));
+
+        log.appendAsLeader(records, 0);
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+    }
+
+    @Test
+    public void testFirstUnstableOffsetWithTransactionalData() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1024 * 1024 * 5)
+                .build();
+        log = createLog(logDir, logConfig);
+
+        long pid = 137L;
+        short epoch = 5;
+        int seq = 0;
+
+        // add some transactional records
+        MemoryRecords records = MemoryRecords.withTransactionalRecords(
+                Compression.NONE, pid, epoch, seq,
+                new SimpleRecord("foo".getBytes()),
+                new SimpleRecord("bar".getBytes()),
+                new SimpleRecord("baz".getBytes()));
+
+        LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+        assertEquals(Optional.of(firstAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+
+        // add more transactional records
+        seq += 3;
+        
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid, epoch, seq,
+            new SimpleRecord("blah".getBytes())), 0);
+        assertEquals(Optional.of(firstAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+
+        // now transaction is committed
+        LogAppendInfo commitAppendInfo = 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
+                ControlRecordType.COMMIT, mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel());
+
+        // first unstable offset is not updated until the high watermark is 
advanced
+        assertEquals(Optional.of(firstAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+        log.updateHighWatermark(commitAppendInfo.lastOffset() + 1);
+
+        // now there should be no first unstable offset
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+    }
+
+    private void append(int epoch, long startOffset, int count) {
+        Function<Integer, MemoryRecords> records = i ->
+                records(List.of(new SimpleRecord("value".getBytes())), 
startOffset + i, epoch);
+        for (int i = 0; i < count; i++) {
+            log.appendAsFollower(records.apply(i), epoch);
+        }
+    }
+
+    private LeaderEpochFileCache epochCache(UnifiedLog log) {
+        return log.leaderEpochCache();
+    }
+
+    private UnifiedLog createLog(File dir, LogConfig config) throws 
IOException {
+        return createLog(dir, config, false);
+    }
+
+    private UnifiedLog createLog(File dir, LogConfig config, boolean 
remoteStorageSystemEnable) throws IOException {
+        return createLog(dir, config, this.brokerTopicStats, 
mockTime.scheduler, this.mockTime,
+                this.producerStateManagerConfig, Optional.empty(), 
remoteStorageSystemEnable);
+    }
+
+    private UnifiedLog createLog(
+            File dir,
+            LogConfig config,
+            BrokerTopicStats brokerTopicStats,
+            Scheduler scheduler,
+            MockTime time,
+            ProducerStateManagerConfig producerStateManagerConfig,
+            Optional<Uuid> topicId,
+            boolean remoteStorageSystemEnable) throws IOException {
+
+        UnifiedLog log = UnifiedLog.create(
+                dir,
+                config,
+                0L,
+                0L,
+                scheduler,
+                brokerTopicStats,
+                time,
+                3600000,
+                producerStateManagerConfig,
+                
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+                new LogDirFailureChannel(10),
+                true,
+                topicId,
+                new ConcurrentHashMap<>(),
+                remoteStorageSystemEnable,
+                LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+        );
+
+        this.logsToClose.add(log);
+        return log;
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, long timestamp) 
{
+        return singletonRecords(value, null, Compression.NONE, timestamp, 
RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords singletonRecords(
+            byte[] value
+    ) {
+        return records(List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, 
null, value)),
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                Compression.NONE,
+                RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH,
+                RecordBatch.NO_SEQUENCE,
+                0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH
+        );
+    }
+
+    public static MemoryRecords singletonRecords(
+            byte[] value,
+            byte[] key,
+            Compression codec,
+            long timestamp,
+            byte magicValue
+    ) {
+        return records(List.of(new SimpleRecord(timestamp, key, value)),
+                magicValue, codec,
+                RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH,
+                RecordBatch.NO_SEQUENCE,
+                0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH
+        );
+    }
+
+    public static MemoryRecords singletonRecords(byte[] value, byte[] key, 
long timestamp) {
+        return singletonRecords(value, key, Compression.NONE, timestamp, 
RecordBatch.CURRENT_MAGIC_VALUE);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
baseOffset) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 
baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
baseOffset, int partitionLeaderEpoch) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 
baseOffset, partitionLeaderEpoch);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, Compression compression) {
+        return records(records, magicValue, compression, 
RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression compression,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        ByteBuffer buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
compression, TimestampType.CREATE_TIME, baseOffset,
+                System.currentTimeMillis(), producerId, producerEpoch, 
sequence, false, partitionLeaderEpoch);
+        for (SimpleRecord record : records) {
+            builder.append(record);
+        }
+        return builder.build();
+    }
 }
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index 4c75272edd4..a9e49dd0492 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -35,6 +35,7 @@ import static java.lang.String.format;
  * Helper functions for writing unit tests.
  * <p>
  * <b>Package-private:</b> Not intended for use outside {@code 
org.apache.kafka.common.test}.
+ * Use {@code org/apache/kafka/test/TestUtils} instead.
  */
 class TestUtils {
     private static final Logger log = LoggerFactory.getLogger(TestUtils.class);

Reply via email to