This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06a25151094 KAFKA-19752 Move parts of UnifiedLogTest to storage module
(#21644)
06a25151094 is described below
commit 06a25151094fa9c58f0baf7c43b5c123908c21f7
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Mar 9 10:29:54 2026 +0100
KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21644)
UnifiedLogTest is ~4000 lines, let's convert it in smaller chunks. Here
is roughly the first 1000 lines.
Reviewers: Christo Lolov <[email protected]>, Kuan-Po Tseng
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 934 +-----------------
.../kafka/storage/internals/log/LogTestUtils.java | 93 +-
.../storage/internals/log/UnifiedLogTest.java | 1028 +++++++++++++++++++-
3 files changed, 1113 insertions(+), 942 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index c54f587d452..f4dc46b2f39 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -27,11 +27,10 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.record.internal.FileRecords.TimestampAndOffset
-import org.apache.kafka.common.record.internal.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.internal._
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.{ListOffsetsRequest,
ListOffsetsResponse}
-import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
+import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
@@ -39,10 +38,10 @@ import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
DelayedRemoteListOffsets}
import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffsetException}
-import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
+import org.apache.kafka.server.util.{MockTime, Scheduler}
import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, Cleaner, EpochEntry, LogConfig, LogFileUtils,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment,
LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
RecordValidationException, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
RecordValidationException, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics,
BrokerTopicStats}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
@@ -64,7 +63,6 @@ import java.nio.file.Files
import java.util
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
-import scala.collection.immutable.SortedSet
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
@@ -98,932 +96,6 @@ class UnifiedLogTest {
}
}
- @Test
- def testHighWatermarkMetadataUpdatedAfterSegmentRoll(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- def assertFetchSizeAndOffsets(fetchOffset: Long,
- expectedSize: Int,
- expectedOffsets: Seq[Long]): Unit = {
- val readInfo = log.read(
- fetchOffset,
- 2048,
- FetchIsolation.HIGH_WATERMARK,
- false)
- assertEquals(expectedSize, readInfo.records.sizeInBytes)
- assertEquals(expectedOffsets,
readInfo.records.records.asScala.map(_.offset))
- }
-
- val records = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
- ))
-
- log.appendAsLeader(records, 0)
- assertFetchSizeAndOffsets(fetchOffset = 0L, 0, Seq())
-
- log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
- assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1,
2))
-
- log.roll()
- assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1,
2))
-
- log.appendAsLeader(records, 0)
- assertFetchSizeAndOffsets(fetchOffset = 3L, 0, Seq())
- }
-
- @Test
- def testAppendAsLeaderWithRaftLeader(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
- val leaderEpoch = 0
-
- def records(offset: Long): MemoryRecords = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
- ), baseOffset = offset, partitionLeaderEpoch = leaderEpoch)
-
- log.appendAsLeader(records(0), leaderEpoch, AppendOrigin.RAFT_LEADER)
- assertEquals(0, log.logStartOffset)
- assertEquals(3L, log.logEndOffset)
-
- // Since raft leader is responsible for assigning offsets, and the
LogValidator is bypassed from the performance perspective,
- // so the first offset of the MemoryRecords to be append should equal to
the next offset in the log
- assertThrows(classOf[UnexpectedAppendOffsetException], () =>
log.appendAsLeader(records(1), leaderEpoch, AppendOrigin.RAFT_LEADER))
-
- // When the first offset of the MemoryRecords to be append equals to the
next offset in the log, append will succeed
- log.appendAsLeader(records(3), leaderEpoch, AppendOrigin.RAFT_LEADER)
- assertEquals(6, log.logEndOffset)
- }
-
- @Test
- def testAppendInfoFirstOffset(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- val simpleRecords = List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
- )
-
- val records = TestUtils.records(simpleRecords)
-
- val firstAppendInfo = log.appendAsLeader(records, 0)
- assertEquals(0, firstAppendInfo.firstOffset)
-
- val secondAppendInfo = log.appendAsLeader(
- TestUtils.records(simpleRecords),
- 0
- )
- assertEquals(simpleRecords.size, secondAppendInfo.firstOffset)
-
- log.roll()
- val afterRollAppendInfo =
log.appendAsLeader(TestUtils.records(simpleRecords), 0)
- assertEquals(simpleRecords.size * 2, afterRollAppendInfo.firstOffset)
- }
-
- @Test
- def testTruncateBelowFirstUnstableOffset(): Unit = {
- testTruncateBelowFirstUnstableOffset((log, targetOffset) =>
log.truncateTo(targetOffset))
- }
-
- @Test
- def testTruncateFullyAndStartBelowFirstUnstableOffset(): Unit = {
- testTruncateBelowFirstUnstableOffset((log, targetOffset) =>
log.truncateFullyAndStartAt(targetOffset, Optional.empty))
- }
-
- @Test
- def testTruncateFullyAndStart(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- val producerId = 17L
- val producerEpoch: Short = 10
- val sequence = 0
-
- log.appendAsLeader(TestUtils.records(List(
- new SimpleRecord("0".getBytes),
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )), 0)
-
- log.appendAsLeader(MemoryRecords.withTransactionalRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("3".getBytes),
- new SimpleRecord("4".getBytes)
- ), 0)
-
- assertEquals(Optional.of(3L), log.firstUnstableOffset)
-
- // We close and reopen the log to ensure that the first unstable offset
segment
- // position will be undefined when we truncate the log.
- log.close()
-
- val reopened = createLog(logDir, logConfig)
- assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager.firstUnstableOffset)
-
- reopened.truncateFullyAndStartAt(2L, Optional.of(1L))
- assertEquals(Optional.empty, reopened.firstUnstableOffset)
- assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
- assertEquals(1L, reopened.logStartOffset)
- assertEquals(2L, reopened.logEndOffset)
- }
-
- private def testTruncateBelowFirstUnstableOffset(truncateFunc: (UnifiedLog,
Long) => Unit): Unit = {
- // Verify that truncation below the first unstable offset correctly
- // resets the producer state. Specifically we are testing the case when
- // the segment position of the first unstable offset is unknown.
-
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- val producerId = 17L
- val producerEpoch: Short = 10
- val sequence = 0
-
- log.appendAsLeader(TestUtils.records(List(
- new SimpleRecord("0".getBytes),
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )), 0)
-
- log.appendAsLeader(MemoryRecords.withTransactionalRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("3".getBytes),
- new SimpleRecord("4".getBytes)
- ), 0)
-
- assertEquals(Optional.of(3L), log.firstUnstableOffset)
-
- // We close and reopen the log to ensure that the first unstable offset
segment
- // position will be undefined when we truncate the log.
- log.close()
-
- val reopened = createLog(logDir, logConfig)
- assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager.firstUnstableOffset)
-
- truncateFunc(reopened, 0L)
- assertEquals(Optional.empty, reopened.firstUnstableOffset)
- assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
- }
-
- @Test
- def testHighWatermarkMaintenance(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
- val leaderEpoch = 0
-
- def records(offset: Long): MemoryRecords = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
- ), baseOffset = offset, partitionLeaderEpoch= leaderEpoch)
-
- def assertHighWatermark(offset: Long): Unit = {
- assertEquals(offset, log.highWatermark)
- assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
- }
-
- // High watermark initialized to 0
- assertHighWatermark(0L)
-
- // High watermark not changed by append
- log.appendAsLeader(records(0), leaderEpoch)
- assertHighWatermark(0L)
-
- // Update high watermark as leader
- log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L))
- assertHighWatermark(1L)
-
- // Cannot update past the log end offset
- log.updateHighWatermark(5L)
- assertHighWatermark(3L)
-
- // Update high watermark as follower
- log.appendAsFollower(records(3L), leaderEpoch)
- log.updateHighWatermark(6L)
- assertHighWatermark(6L)
-
- // High watermark should be adjusted by truncation
- log.truncateTo(3L)
- assertHighWatermark(3L)
-
- log.appendAsLeader(records(0L), 0)
- assertHighWatermark(3L)
- assertEquals(6L, log.logEndOffset)
- assertEquals(0L, log.logStartOffset)
-
- // Full truncation should also reset high watermark
- log.truncateFullyAndStartAt(4L, Optional.empty)
- assertEquals(4L, log.logEndOffset)
- assertEquals(4L, log.logStartOffset)
- assertHighWatermark(4L)
- }
-
- private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation:
FetchIsolation, batchBaseOffset: Long): Unit = {
- val readInfo = log.read(offset, Int.MaxValue, isolation, true)
-
- assertFalse(readInfo.firstEntryIncomplete)
- assertTrue(readInfo.records.sizeInBytes > 0)
-
- val upperBoundOffset = isolation match {
- case FetchIsolation.LOG_END => log.logEndOffset
- case FetchIsolation.HIGH_WATERMARK => log.highWatermark
- case FetchIsolation.TXN_COMMITTED => log.lastStableOffset
- }
-
- for (record <- readInfo.records.records.asScala)
- assertTrue(record.offset < upperBoundOffset)
-
- assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
- assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
- }
-
- private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation:
FetchIsolation, batchBaseOffset: Long): Unit = {
- val readInfo = log.read(offset, Int.MaxValue, isolation, true)
- assertFalse(readInfo.firstEntryIncomplete)
- assertEquals(0, readInfo.records.sizeInBytes)
- assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
- assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
- }
-
- @Test
- def testFetchUpToLogEndOffset(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- log.appendAsLeader(TestUtils.records(List(
- new SimpleRecord("0".getBytes),
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )), 0)
- log.appendAsLeader(TestUtils.records(List(
- new SimpleRecord("3".getBytes),
- new SimpleRecord("4".getBytes)
- )), 0)
- val batchBaseOffsets = SortedSet[Long](0, 3, 5)
-
- (log.logStartOffset until log.logEndOffset).foreach { offset =>
- val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
- assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, batchBaseOffset)
- }
- }
-
- @Test
- def testFetchUpToHighWatermark(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- log.appendAsLeader(TestUtils.records(List(
- new SimpleRecord("0".getBytes),
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )), 0)
- log.appendAsLeader(TestUtils.records(List(
- new SimpleRecord("3".getBytes),
- new SimpleRecord("4".getBytes)
- )), 0)
- val batchBaseOffsets = SortedSet[Long](0, 3, 5)
-
- def assertHighWatermarkBoundedFetches(): Unit = {
- (log.logStartOffset until log.highWatermark).foreach { offset =>
- val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
- assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset)
- }
-
- (log.highWatermark to log.logEndOffset).foreach { offset =>
- val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
- assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset)
- }
- }
-
- assertHighWatermarkBoundedFetches()
-
- log.updateHighWatermark(3L)
- assertHighWatermarkBoundedFetches()
-
- log.updateHighWatermark(5L)
- assertHighWatermarkBoundedFetches()
- }
-
- @Test
- def testActiveProducers(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- def assertProducerState(
- producerId: Long,
- producerEpoch: Short,
- lastSequence: Int,
- currentTxnStartOffset: Option[Long],
- coordinatorEpoch: Option[Int]
- ): Unit = {
- val producerStateOpt = log.activeProducers.asScala.find(_.producerId ==
producerId)
- assertTrue(producerStateOpt.isDefined)
-
- val producerState = producerStateOpt.get
- assertEquals(producerEpoch, producerState.producerEpoch)
- assertEquals(lastSequence, producerState.lastSequence)
- assertEquals(currentTxnStartOffset.getOrElse(-1L),
producerState.currentTxnStartOffset)
- assertEquals(coordinatorEpoch.getOrElse(-1),
producerState.coordinatorEpoch)
- }
-
- // Test transactional producer state (open transaction)
- val producer1Epoch = 5.toShort
- val producerId1 = 1L
- LogTestUtils.appendTransactionalAsLeader(log, producerId1, producer1Epoch,
mockTime)(5)
- assertProducerState(
- producerId1,
- producer1Epoch,
- lastSequence = 4,
- currentTxnStartOffset = Some(0L),
- coordinatorEpoch = None
- )
-
- // Test transactional producer state (closed transaction)
- val coordinatorEpoch = 15
- LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, producer1Epoch,
ControlRecordType.COMMIT,
- mockTime.milliseconds(), coordinatorEpoch, leaderEpoch = 0,
transactionVersion = TransactionVersion.TV_0.featureLevel())
- assertProducerState(
- producerId1,
- producer1Epoch,
- lastSequence = 4,
- currentTxnStartOffset = None,
- coordinatorEpoch = Some(coordinatorEpoch)
- )
-
- // Test idempotent producer state
- val producer2Epoch = 5.toShort
- val producerId2 = 2L
- LogTestUtils.appendIdempotentAsLeader(log, producerId2, producer2Epoch,
mockTime)(3)
- assertProducerState(
- producerId2,
- producer2Epoch,
- lastSequence = 2,
- currentTxnStartOffset = None,
- coordinatorEpoch = None
- )
- }
-
- @Test
- def testFetchUpToLastStableOffset(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
- val epoch = 0.toShort
-
- val producerId1 = 1L
- val producerId2 = 2L
-
- val appendProducer1 = LogTestUtils.appendTransactionalAsLeader(log,
producerId1, epoch, mockTime)
- val appendProducer2 = LogTestUtils.appendTransactionalAsLeader(log,
producerId2, epoch, mockTime)
-
- appendProducer1(5)
- LogTestUtils.appendNonTransactionalAsLeader(log, 3)
- appendProducer2(2)
- appendProducer1(4)
- LogTestUtils.appendNonTransactionalAsLeader(log, 2)
- appendProducer1(10)
-
- val batchBaseOffsets = SortedSet[Long](0, 5, 8, 10, 14, 16, 26, 27, 28)
-
- def assertLsoBoundedFetches(): Unit = {
- (log.logStartOffset until log.lastStableOffset).foreach { offset =>
- val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
- assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset)
- }
-
- (log.lastStableOffset to log.logEndOffset).foreach { offset =>
- val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
- assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset)
- }
- }
-
- assertLsoBoundedFetches()
-
- log.updateHighWatermark(log.logEndOffset)
- assertLsoBoundedFetches()
-
- LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, epoch,
ControlRecordType.COMMIT, mockTime.milliseconds(),
- transactionVersion = TransactionVersion.TV_0.featureLevel())
- assertEquals(0L, log.lastStableOffset)
-
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(8L, log.lastStableOffset)
- assertLsoBoundedFetches()
-
- LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId2, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- transactionVersion = TransactionVersion.TV_0.featureLevel())
- assertEquals(8L, log.lastStableOffset)
-
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(log.logEndOffset, log.lastStableOffset)
- assertLsoBoundedFetches()
- }
-
- /**
- * Tests for time based log roll. This test appends messages then changes
the time
- * using the mock clock to force the log to roll and checks the number of
segments.
- */
- @Test
- def testTimeBasedLogRollDuringAppend(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
-
- // create a log
- val log = createLog(logDir, logConfig, producerStateManagerConfig = new
ProducerStateManagerConfig(24 * 60, false))
- assertEquals(1, log.numberOfSegments, "Log begins with a single empty
segment.")
- // Test the segment rolling behavior when messages do not have a timestamp.
- mockTime.sleep(log.config.segmentMs + 1)
- log.appendAsLeader(createRecords, 0)
- assertEquals(1, log.numberOfSegments, "Log doesn't roll if doing so
creates an empty segment.")
-
- log.appendAsLeader(createRecords, 0)
- assertEquals(2, log.numberOfSegments, "Log rolls on this append since time
has expired.")
-
- for (numSegments <- 3 until 5) {
- mockTime.sleep(log.config.segmentMs + 1)
- log.appendAsLeader(createRecords, 0)
- assertEquals(numSegments, log.numberOfSegments, "Changing time beyond
rollMs and appending should create a new segment.")
- }
-
- // Append a message with timestamp to a segment whose first message do not
have a timestamp.
- val timestamp = mockTime.milliseconds + log.config.segmentMs + 1
- def createRecordsWithTimestamp = TestUtils.singletonRecords(value =
"test".getBytes, timestamp = timestamp)
- log.appendAsLeader(createRecordsWithTimestamp, 0)
- assertEquals(4, log.numberOfSegments, "Segment should not have been rolled
out because the log rolling should be based on wall clock.")
-
- // Test the segment rolling behavior when messages have timestamps.
- mockTime.sleep(log.config.segmentMs + 1)
- log.appendAsLeader(createRecordsWithTimestamp, 0)
- assertEquals(5, log.numberOfSegments, "A new segment should have been
rolled out")
-
- // move the wall clock beyond log rolling time
- mockTime.sleep(log.config.segmentMs + 1)
- log.appendAsLeader(createRecordsWithTimestamp, 0)
- assertEquals(5, log.numberOfSegments, "Log should not roll because the
roll should depend on timestamp of the first message.")
-
- val recordWithExpiredTimestamp = TestUtils.singletonRecords(value =
"test".getBytes, timestamp = mockTime.milliseconds)
- log.appendAsLeader(recordWithExpiredTimestamp, 0)
- assertEquals(6, log.numberOfSegments, "Log should roll because the
timestamp in the message should make the log segment expire.")
-
- val numSegments = log.numberOfSegments
- mockTime.sleep(log.config.segmentMs + 1)
- log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE), 0)
- assertEquals(numSegments, log.numberOfSegments, "Appending an empty
message set should not roll log even if sufficient time has passed.")
- }
-
- @Test
- def testRollSegmentThatAlreadyExists(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
- val partitionLeaderEpoch = 0
-
- // create a log
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "Log begins with a single empty
segment.")
-
- // roll active segment with the same base offset of size zero should
recreate the segment
- log.roll(Optional.of(0L))
- assertEquals(1, log.numberOfSegments, "Expect 1 segment after roll() empty
segment with base offset.")
-
- // should be able to append records to active segment
- val records = TestUtils.records(
- List(new SimpleRecord(mockTime.milliseconds, "k1".getBytes,
"v1".getBytes)),
- baseOffset = 0L, partitionLeaderEpoch = partitionLeaderEpoch)
- log.appendAsFollower(records, partitionLeaderEpoch)
- assertEquals(1, log.numberOfSegments, "Expect one segment.")
- assertEquals(0L, log.activeSegment.baseOffset)
-
- // make sure we can append more records
- val records2 = TestUtils.records(
- List(new SimpleRecord(mockTime.milliseconds + 10, "k2".getBytes,
"v2".getBytes)),
- baseOffset = 1L, partitionLeaderEpoch = partitionLeaderEpoch)
- log.appendAsFollower(records2, partitionLeaderEpoch)
-
- assertEquals(2, log.logEndOffset, "Expect two records in the log")
- assertEquals(0, LogTestUtils.readLog(log, 0,
1).records.batches.iterator.next().lastOffset)
- assertEquals(1, LogTestUtils.readLog(log, 1,
1).records.batches.iterator.next().lastOffset)
-
- // roll so that active segment is empty
- log.roll()
- assertEquals(2L, log.activeSegment.baseOffset, "Expect base offset of
active segment to be LEO")
- assertEquals(2, log.numberOfSegments, "Expect two segments.")
-
- // manually resize offset index to force roll of an empty active segment
on next append
- log.activeSegment.offsetIndex.resize(0)
- val records3 = TestUtils.records(
- List(new SimpleRecord(mockTime.milliseconds + 12, "k3".getBytes,
"v3".getBytes)),
- baseOffset = 2L, partitionLeaderEpoch = partitionLeaderEpoch)
- log.appendAsFollower(records3, partitionLeaderEpoch)
- assertTrue(log.activeSegment.offsetIndex.maxEntries > 1)
- assertEquals(2, LogTestUtils.readLog(log, 2,
1).records.batches.iterator.next().lastOffset)
- assertEquals(2, log.numberOfSegments, "Expect two segments.")
- }
-
- @Test
- def testNonSequentialAppend(): Unit = {
- // create a log
- val log = createLog(logDir, new LogConfig(new Properties))
- val pid = 1L
- val epoch: Short = 0
-
- val records = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
producerId = pid, producerEpoch = epoch, sequence = 0)
- log.appendAsLeader(records, 0)
-
- val nextRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
producerId = pid, producerEpoch = epoch, sequence = 2)
- assertThrows(classOf[OutOfOrderSequenceException], () =>
log.appendAsLeader(nextRecords, 0))
- }
-
- @Test
- def testTruncateToEndOffsetClearsEpochCache(): Unit = {
- val log = createLog(logDir, new LogConfig(new Properties))
-
- // Seed some initial data in the log
- val records = TestUtils.records(List(new SimpleRecord("a".getBytes), new
SimpleRecord("b".getBytes)),
- baseOffset = 27)
- appendAsFollower(log, records, 19)
- assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache.latestEntry)
- assertEquals(29, log.logEndOffset)
-
- def verifyTruncationClearsEpochCache(epoch: Int, truncationOffset: Long):
Unit = {
- // Simulate becoming a leader
- log.assignEpochStartOffset(epoch, log.logEndOffset)
- assertEquals(Optional.of(new EpochEntry(epoch, 29)),
log.leaderEpochCache.latestEntry)
- assertEquals(29, log.logEndOffset)
-
- // Now we become the follower and truncate to an offset greater
- // than or equal to the log end offset. The trivial epoch entry
- // at the end of the log should be gone
- log.truncateTo(truncationOffset)
- assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache.latestEntry)
- assertEquals(29, log.logEndOffset)
- }
-
- // Truncations greater than or equal to the log end offset should
- // clear the epoch cache
- verifyTruncationClearsEpochCache(epoch = 20, truncationOffset =
log.logEndOffset)
- verifyTruncationClearsEpochCache(epoch = 24, truncationOffset =
log.logEndOffset + 1)
- }
-
- /**
- * Test the values returned by the logSegments call
- */
- @Test
- def testLogSegmentsCallCorrect(): Unit = {
- // Create 3 segments and make sure we get the right values from various
logSegments calls.
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds)
- def getSegmentOffsets(log :UnifiedLog, from: Long, to: Long) =
log.logSegments(from, to).stream().map { _.baseOffset }.toList
- val setSize = createRecords.sizeInBytes
- val msgPerSeg = 10
- val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
- // create a log
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
- // segments expire in size
- for (_ <- 1 to (2 * msgPerSeg + 2))
- log.appendAsLeader(createRecords, 0)
- assertEquals(3, log.numberOfSegments, "There should be exactly 3
segments.")
-
- // from == to should always be null
- assertEquals(util.List.of(), getSegmentOffsets(log, 10, 10))
- assertEquals(util.List.of(), getSegmentOffsets(log, 15, 15))
-
- assertEquals(util.List.of(0L, 10L, 20L), getSegmentOffsets(log, 0, 21))
-
- assertEquals(util.List.of(0L), getSegmentOffsets(log, 1, 5))
- assertEquals(util.List.of(10L, 20L), getSegmentOffsets(log, 13, 21))
- assertEquals(util.List.of(10L), getSegmentOffsets(log, 13, 17))
-
- // from < to is bad
- assertThrows(classOf[IllegalArgumentException], () => log.logSegments(10,
0))
- }
-
- @Test
- def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
- // simulate the upgrade path by creating a new log with several segments,
deleting the
- // snapshot files, and then reloading the log
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 64 * 10)
- var log = createLog(logDir, logConfig)
- assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset)
-
- for (i <- 0 to 100) {
- val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
- log.appendAsLeader(TestUtils.records(List(record)), 0)
- }
- assertTrue(log.logSegments.size >= 2)
- val logEndOffset = log.logEndOffset
- log.close()
-
- LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
- // Reload after clean shutdown
- log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
- var expectedSnapshotOffsets =
log.logSegments.asScala.map(_.baseOffset).takeRight(2).toVector :+
log.logEndOffset
- assertEquals(expectedSnapshotOffsets,
LogTestUtils.listProducerSnapshotOffsets(logDir))
- log.close()
-
- LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
- // Reload after unclean shutdown with recoveryPoint set to log end offset
- log = createLog(logDir, logConfig, recoveryPoint = logEndOffset,
lastShutdownClean = false)
- assertEquals(expectedSnapshotOffsets,
LogTestUtils.listProducerSnapshotOffsets(logDir))
- log.close()
-
- LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
- // Reload after unclean shutdown with recoveryPoint set to 0
- log = createLog(logDir, logConfig, recoveryPoint = 0L, lastShutdownClean =
false)
- // We progressively create a snapshot for each segment after the recovery
point
- expectedSnapshotOffsets =
log.logSegments.asScala.map(_.baseOffset).tail.toVector :+ log.logEndOffset
- assertEquals(expectedSnapshotOffsets,
LogTestUtils.listProducerSnapshotOffsets(logDir))
- log.close()
- }
-
- @Test
- def testLogReinitializeAfterManualDelete(): Unit = {
- val logConfig = LogTestUtils.createLogConfig()
- // simulate a case where log data does not exist but the start offset is
non-zero
- val log = createLog(logDir, logConfig, logStartOffset = 500)
- assertEquals(500, log.logStartOffset)
- assertEquals(500, log.logEndOffset)
- }
-
- /**
- * Test that "PeriodicProducerExpirationCheck" scheduled task gets canceled
after log
- * is deleted.
- */
- @Test
- def testProducerExpireCheckAfterDelete(): Unit = {
- val scheduler = new KafkaScheduler(1)
- try {
- scheduler.startup()
- val logConfig = LogTestUtils.createLogConfig()
- val log = createLog(logDir, logConfig, scheduler = scheduler)
-
- val producerExpireCheck = log.producerExpireCheck
- assertTrue(scheduler.taskRunning(producerExpireCheck),
"producerExpireCheck isn't as part of scheduled tasks")
-
- log.delete()
- assertFalse(scheduler.taskRunning(producerExpireCheck),
- "producerExpireCheck is part of scheduled tasks even after log
deletion")
- } finally {
- scheduler.shutdown()
- }
- }
-
- @Test
- def testProducerIdMapOffsetUpdatedForNonIdempotentData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- val records = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
- log.appendAsLeader(records, 0)
- log.takeProducerSnapshot()
- assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset)
- }
-
- @Test
- def testRebuildProducerIdMapWithCompactedData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- val pid = 1L
- val producerEpoch = 0.toShort
- val partitionLeaderEpoch = 0
- val seq = 0
- val baseOffset = 23L
-
- // create a batch with a couple gaps to simulate compaction
- val records = TestUtils.records(
- producerId = pid,
- producerEpoch = producerEpoch,
- sequence = seq,
- baseOffset = baseOffset,
- records = List(
- new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "key".getBytes,
"b".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)
- )
- )
- records.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-
- val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new RecordFilter(0, 0) {
- override def checkBatchRetention(batch: RecordBatch):
RecordFilter.BatchRetentionResult =
- new
RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY,
false)
- override def shouldRetainRecord(recordBatch: RecordBatch, record:
Record): Boolean = !record.hasKey
- }, filtered, BufferSupplier.NO_CACHING)
- filtered.flip()
- val filteredRecords = MemoryRecords.readableRecords(filtered)
-
- log.appendAsFollower(filteredRecords, partitionLeaderEpoch)
-
- // append some more data and then truncate to force rebuilding of the PID
map
- val moreRecords = TestUtils.records(
- baseOffset = baseOffset + 4,
- records = List(
- new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "f".getBytes)
- )
- )
-
moreRecords.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
- log.appendAsFollower(moreRecords, partitionLeaderEpoch)
-
- log.truncateTo(baseOffset + 4)
-
- val activeProducers = log.activeProducersWithLastSequence
- assertTrue(activeProducers.containsKey(pid))
-
- val lastSeq = activeProducers.get(pid)
- assertEquals(3, lastSeq)
- }
-
- @Test
- def testRebuildProducerStateWithEmptyCompactedBatch(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- val pid = 1L
- val producerEpoch = 0.toShort
- val partitionLeaderEpoch = 0
- val seq = 0
- val baseOffset = 23L
-
- // create an empty batch
- val records = TestUtils.records(
- producerId = pid,
- producerEpoch = producerEpoch,
- sequence = seq,
- baseOffset = baseOffset,
- records = List(
- new SimpleRecord(mockTime.milliseconds(), "key".getBytes,
"a".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "b".getBytes)
- )
- )
- records.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-
- val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new RecordFilter(0, 0) {
- override def checkBatchRetention(batch: RecordBatch):
RecordFilter.BatchRetentionResult =
- new
RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.RETAIN_EMPTY,
true)
- override def shouldRetainRecord(recordBatch: RecordBatch, record:
Record): Boolean = false
- }, filtered, BufferSupplier.NO_CACHING)
- filtered.flip()
- val filteredRecords = MemoryRecords.readableRecords(filtered)
-
- log.appendAsFollower(filteredRecords, partitionLeaderEpoch)
-
- // append some more data and then truncate to force rebuilding of the PID
map
- val moreRecords = TestUtils.records(
- baseOffset = baseOffset + 2,
- records = List(
- new SimpleRecord(mockTime.milliseconds(), "e".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "f".getBytes)
- )
- )
-
moreRecords.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
- log.appendAsFollower(moreRecords, partitionLeaderEpoch)
-
- log.truncateTo(baseOffset + 2)
-
- val activeProducers = log.activeProducersWithLastSequence
- assertTrue(activeProducers.containsKey(pid))
-
- val lastSeq = activeProducers.get(pid)
- assertEquals(1, lastSeq)
- }
-
- @Test
- def testUpdateProducerIdMapWithCompactedData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- val pid = 1L
- val producerEpoch = 0.toShort
- val partitionLeaderEpoch = 0
- val seq = 0
- val baseOffset = 23L
-
- // create a batch with a couple gaps to simulate compaction
- val records = TestUtils.records(
- producerId = pid,
- producerEpoch = producerEpoch,
- sequence = seq,
- baseOffset = baseOffset,
- records = List(
- new SimpleRecord(mockTime.milliseconds(), "a".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "key".getBytes,
"b".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "c".getBytes),
- new SimpleRecord(mockTime.milliseconds(), "key".getBytes, "d".getBytes)
- )
- )
- records.batches.forEach(_.setPartitionLeaderEpoch(partitionLeaderEpoch))
-
- val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new RecordFilter(0, 0) {
- override def checkBatchRetention(batch: RecordBatch):
RecordFilter.BatchRetentionResult =
- new
RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY,
false)
- override def shouldRetainRecord(recordBatch: RecordBatch, record:
Record): Boolean = !record.hasKey
- }, filtered, BufferSupplier.NO_CACHING)
- filtered.flip()
- val filteredRecords = MemoryRecords.readableRecords(filtered)
-
- log.appendAsFollower(filteredRecords, partitionLeaderEpoch)
- val activeProducers = log.activeProducersWithLastSequence
- assertTrue(activeProducers.containsKey(pid))
-
- val lastSeq = activeProducers.get(pid)
- assertEquals(3, lastSeq)
- }
-
- @Test
- def testProducerIdMapTruncateTo(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("a".getBytes))), 0)
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("b".getBytes))), 0)
- log.takeProducerSnapshot()
-
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("c".getBytes))), 0)
- log.takeProducerSnapshot()
-
- log.truncateTo(2)
- assertEquals(OptionalLong.of(2), log.latestProducerSnapshotOffset)
- assertEquals(2, log.latestProducerStateEndOffset)
-
- log.truncateTo(1)
- assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset)
- assertEquals(1, log.latestProducerStateEndOffset)
-
- log.truncateTo(0)
- assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset)
- assertEquals(0, log.latestProducerStateEndOffset)
- }
-
- @Test
- def testProducerIdMapTruncateToWithNoSnapshots(): Unit = {
- // This ensures that the upgrade optimization path cannot be hit after
initial loading
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- val pid = 1L
- val epoch = 0.toShort
-
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)),
producerId = pid,
- producerEpoch = epoch, sequence = 0), 0)
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)),
producerId = pid,
- producerEpoch = epoch, sequence = 1), 0)
-
- LogTestUtils.deleteProducerSnapshotFiles(logDir)
-
- log.truncateTo(1L)
- assertEquals(1, log.activeProducersWithLastSequence.size)
-
- val lastSeq = log.activeProducersWithLastSequence.get(pid)
- assertEquals(0, lastSeq)
- }
-
- @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with
createEmptyActiveSegment: {0}")
- @ValueSource(booleans = Array(true, false))
- def testRetentionDeletesProducerStateSnapshots(createEmptyActiveSegment:
Boolean): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
- val log = createLog(logDir, logConfig)
- val pid1 = 1L
- val epoch = 0.toShort
-
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 0), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 1), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 2), 0)
- if (createEmptyActiveSegment) {
- log.roll()
- }
-
- log.updateHighWatermark(log.logEndOffset)
-
- val numProducerSnapshots = if (createEmptyActiveSegment) 3 else 2
- assertEquals(numProducerSnapshots,
ProducerStateManager.listSnapshotFiles(logDir).size)
- // Sleep to breach the retention period
- mockTime.sleep(1000 * 60 + 1)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- // Sleep to breach the file delete delay and run scheduled file deletion
tasks
- mockTime.sleep(1)
- assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
- "expect a single producer state snapshot remaining")
- assertEquals(3, log.logStartOffset)
- }
-
@Test
def testRetentionIdempotency(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 974913b91be..ed30e25032e 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -28,14 +28,21 @@ import
org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.RequestLocal;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class LogTestUtils {
public static LogSegment createSegment(long offset, File logDir, int
indexIntervalBytes, Time time) throws IOException {
@@ -146,16 +153,98 @@ public class LogTestUtils {
short producerEpoch,
int sequence,
long baseOffset,
- int partitionLeaderEpoch) {
+ int partitionLeaderEpoch,
+ long timestamp) {
ByteBuffer buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue,
codec, TimestampType.CREATE_TIME, baseOffset,
- System.currentTimeMillis(), producerId, producerEpoch, sequence,
false, partitionLeaderEpoch);
+ timestamp, producerId, producerEpoch, sequence, false,
partitionLeaderEpoch);
records.forEach(builder::append);
return builder.build();
}
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression codec,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ return records(records, magicValue, codec, producerId, producerEpoch,
sequence, baseOffset, partitionLeaderEpoch, System.currentTimeMillis());
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, producerId, producerEpoch, sequence, baseOffset,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
timestamp) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH, timestamp);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
baseOffset, int partitionLeaderEpoch) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
+ }
+
+ public static void deleteProducerSnapshotFiles(File logDir) {
+ Stream.of(logDir.listFiles())
+ .filter(f -> f.isFile() &&
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
+ .forEach(f -> assertDoesNotThrow(() -> Utils.delete(f)));
+ }
+
+ public static List<Long> listProducerSnapshotOffsets(File logDir) throws
IOException {
+ return ProducerStateManager.listSnapshotFiles(logDir).stream().map(f
-> f.offset).sorted().toList();
+ }
+
+ public static void appendNonTransactionalAsLeader(UnifiedLog log, int
numRecords) throws IOException {
+ List<SimpleRecord> simpleRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ simpleRecords.add(new SimpleRecord(String.valueOf(i).getBytes()));
+ }
+ MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
simpleRecords.toArray(new SimpleRecord[0]));
+ log.appendAsLeader(records, 0);
+ }
+
+ public static Consumer<Integer> appendTransactionalAsLeader(UnifiedLog log,
+ long
producerId,
+ short
producerEpoch,
+ Time time) {
+ return appendIdempotentAsLeader(log, producerId, producerEpoch, time,
true);
+ }
+
+ public static Consumer<Integer> appendIdempotentAsLeader(UnifiedLog log,
+ long producerId,
+ short
producerEpoch,
+ Time time,
+ boolean
isTransactional) {
+ final AtomicInteger sequence = new AtomicInteger(0);
+ return numRecords -> {
+ int baseSequence = sequence.get();
+ List<SimpleRecord> simpleRecords = new ArrayList<>();
+ for (int i = baseSequence; i < baseSequence + numRecords; i++) {
+ simpleRecords.add(new SimpleRecord(time.milliseconds(),
String.valueOf(i).getBytes()));
+ }
+
+ MemoryRecords records = isTransactional
+ ? MemoryRecords.withTransactionalRecords(Compression.NONE,
producerId,
+ producerEpoch, baseSequence, simpleRecords.toArray(new
SimpleRecord[0]))
+ : MemoryRecords.withIdempotentRecords(Compression.NONE,
producerId,
+ producerEpoch, baseSequence, simpleRecords.toArray(new
SimpleRecord[0]));
+
+ assertDoesNotThrow(() -> log.appendAsLeader(records, 0));
+ sequence.addAndGet(numRecords);
+ };
+ }
+
public static class LogConfigBuilder {
private final Map<String, Object> configs = new HashMap<>();
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index efef8c8ee4c..e8d1b01391c 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -18,18 +18,24 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.DefaultRecordBatch;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.server.common.TransactionVersion;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException;
+import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
@@ -42,6 +48,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
@@ -49,13 +56,22 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Properties;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
@@ -63,6 +79,10 @@ import static org.mockito.Mockito.spy;
public class UnifiedLogTest {
+ private static final int ONE_MB = 1024 * 1024;
+ private static final int TEN_KB = 2048 * 5;
+ private static final long ONE_HOUR = 60 * 60L;
+
private final File tmpDir = TestUtils.tempDirectory();
private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
private final BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(false);
@@ -77,7 +97,7 @@ public class UnifiedLogTest {
public void tearDown() throws IOException {
brokerTopicStats.close();
for (UnifiedLog log : logsToClose) {
- log.close();
+ Utils.closeQuietly(log, "UnifiedLog");
}
Utils.delete(tmpDir);
}
@@ -570,7 +590,7 @@ public class UnifiedLogTest {
@Test
public void testFirstUnstableOffsetNoTransactionalData() throws
IOException {
LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
- .segmentBytes(1024 * 1024 * 5)
+ .segmentBytes(5 * ONE_MB)
.build();
log = createLog(logDir, logConfig);
@@ -586,7 +606,7 @@ public class UnifiedLogTest {
@Test
public void testFirstUnstableOffsetWithTransactionalData() throws
IOException {
LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
- .segmentBytes(1024 * 1024 * 5)
+ .segmentBytes(5 * ONE_MB)
.build();
log = createLog(logDir, logConfig);
@@ -622,6 +642,988 @@ public class UnifiedLogTest {
assertEquals(Optional.empty(), log.firstUnstableOffset());
}
+ @Test
+ public void testHighWatermarkMetadataUpdatedAfterSegmentRoll() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ MemoryRecords records = LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 0L, 0, List.of());
+
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.roll();
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 3L, 0, List.of());
+ }
+
+ private void assertFetchSizeAndOffsets(UnifiedLog log, long fetchOffset,
int expectedSize, List<Long> expectedOffsets) throws IOException {
+ FetchDataInfo readInfo = log.read(
+ fetchOffset,
+ 2048,
+ FetchIsolation.HIGH_WATERMARK,
+ false);
+ assertEquals(expectedSize, readInfo.records.sizeInBytes());
+ List<Long> actualOffsets = new ArrayList<>();
+ readInfo.records.records().forEach(record ->
actualOffsets.add(record.offset()));
+ assertEquals(expectedOffsets, actualOffsets);
+ }
+
+ @Test
+ public void testAppendAsLeaderWithRaftLeader() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ log.appendAsLeader(records.apply(0L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(0, log.logStartOffset());
+ assertEquals(3L, log.logEndOffset());
+
+ // Since raft leader is responsible for assigning offsets, and the
LogValidator is bypassed from the performance perspective,
+ // so the first offset of the MemoryRecords to be appended should
equal to the next offset in the log
+ assertThrows(UnexpectedAppendOffsetException.class, () ->
log.appendAsLeader(records.apply(1L), leaderEpoch, AppendOrigin.RAFT_LEADER));
+
+ // When the first offset of the MemoryRecords to be appended equals to
the next offset in the log, append will succeed
+ log.appendAsLeader(records.apply(3L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(6, log.logEndOffset());
+ }
+
+ @Test
+ public void testAppendInfoFirstOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ List<SimpleRecord> simpleRecords = List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ );
+
+ MemoryRecords records = LogTestUtils.records(simpleRecords);
+
+ LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+ assertEquals(0, firstAppendInfo.firstOffset());
+
+ LogAppendInfo secondAppendInfo = log.appendAsLeader(
+ LogTestUtils.records(simpleRecords),
+ 0
+ );
+ assertEquals(simpleRecords.size(), secondAppendInfo.firstOffset());
+
+ log.roll();
+ LogAppendInfo afterRollAppendInfo =
log.appendAsLeader(LogTestUtils.records(simpleRecords), 0);
+ assertEquals(simpleRecords.size() * 2,
afterRollAppendInfo.firstOffset());
+ }
+
+ @Test
+ public void testTruncateBelowFirstUnstableOffset() throws IOException {
+ testTruncateBelowFirstUnstableOffset(UnifiedLog::truncateTo);
+ }
+
+ @Test
+ public void testTruncateFullyAndStartBelowFirstUnstableOffset() throws
IOException {
+ testTruncateBelowFirstUnstableOffset((log, targetOffset) ->
log.truncateFullyAndStartAt(targetOffset, Optional.empty()));
+ }
+
+ @Test
+ public void testTruncateFullyAndStart() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ reopened.truncateFullyAndStartAt(2L, Optional.of(1L));
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ assertEquals(1L, reopened.logStartOffset());
+ assertEquals(2L, reopened.logEndOffset());
+ }
+
+ private void testTruncateBelowFirstUnstableOffset(BiConsumer<UnifiedLog,
Long> truncateFunc) throws IOException {
+ // Verify that truncation below the first unstable offset correctly
+ // resets the producer state. Specifically we are testing the case when
+ // the segment position of the first unstable offset is unknown.
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ truncateFunc.accept(reopened, 0L);
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ }
+
+ @Test
+ public void testHighWatermarkMaintenance() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ // High watermark initialized to 0
+ assertHighWatermark(log, 0L);
+
+ // High watermark not changed by append
+ log.appendAsLeader(records.apply(0L), leaderEpoch);
+ assertHighWatermark(log, 0L);
+
+ // Update high watermark as leader
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
+ assertHighWatermark(log, 1L);
+
+ // Cannot update past the log end offset
+ log.updateHighWatermark(5L);
+ assertHighWatermark(log, 3L);
+
+ // Update high watermark as follower
+ log.appendAsFollower(records.apply(3L), leaderEpoch);
+ log.updateHighWatermark(6L);
+ assertHighWatermark(log, 6L);
+
+ // High watermark should be adjusted by truncation
+ log.truncateTo(3L);
+ assertHighWatermark(log, 3L);
+
+ log.appendAsLeader(records.apply(0L), 0);
+ assertHighWatermark(log, 3L);
+ assertEquals(6L, log.logEndOffset());
+ assertEquals(0L, log.logStartOffset());
+
+ // Full truncation should also reset high watermark
+ log.truncateFullyAndStartAt(4L, Optional.empty());
+ assertEquals(4L, log.logEndOffset());
+ assertEquals(4L, log.logStartOffset());
+ assertHighWatermark(log, 4L);
+ }
+
+ private void assertHighWatermark(UnifiedLog log, long offset) throws
IOException {
+ assertEquals(offset, log.highWatermark());
+ assertValidLogOffsetMetadata(log,
log.fetchOffsetSnapshot().highWatermark());
+ }
+
+ private void assertNonEmptyFetch(UnifiedLog log, long offset,
FetchIsolation isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertTrue(readInfo.records.sizeInBytes() > 0);
+
+ long upperBoundOffset = switch (isolation) {
+ case LOG_END -> log.logEndOffset();
+ case HIGH_WATERMARK -> log.highWatermark();
+ case TXN_COMMITTED -> log.lastStableOffset();
+ };
+
+ for (Record record : readInfo.records.records())
+ assertTrue(record.offset() < upperBoundOffset);
+
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ private void assertEmptyFetch(UnifiedLog log, long offset, FetchIsolation
isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertEquals(0, readInfo.records.sizeInBytes());
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ @Test
+ public void testFetchUpToLogEndOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ for (long offset = log.logStartOffset(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testFetchUpToHighWatermark() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(3L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(5L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertHighWatermarkBoundedFetches(UnifiedLog log,
TreeSet<Long> batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset < log.highWatermark();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+
+ for (long offset = log.highWatermark(); offset <= log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testActiveProducers() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Test transactional producer state (open transaction)
+ short producer1Epoch = 5;
+ long producerId1 = 1L;
+ LogTestUtils.appendTransactionalAsLeader(log, producerId1,
producer1Epoch, mockTime).accept(5);
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.of(0L),
+ Optional.empty()
+ );
+
+ // Test transactional producer state (closed transaction)
+ int coordinatorEpoch = 15;
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1,
producer1Epoch, ControlRecordType.COMMIT,
+ mockTime.milliseconds(), coordinatorEpoch, 0,
TransactionVersion.TV_0.featureLevel());
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.empty(),
+ Optional.of(coordinatorEpoch)
+ );
+
+ // Test idempotent producer state
+ short producer2Epoch = 5;
+ long producerId2 = 2L;
+ LogTestUtils.appendIdempotentAsLeader(log, producerId2,
producer2Epoch, mockTime, false).accept(3);
+ assertProducerState(
+ log,
+ producerId2,
+ producer2Epoch,
+ 2,
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ private void assertProducerState(
+ UnifiedLog log,
+ long producerId,
+ short producerEpoch,
+ int lastSequence,
+ Optional<Long> currentTxnStartOffset,
+ Optional<Integer> coordinatorEpoch
+ ) {
+ Optional<DescribeProducersResponseData.ProducerState> producerStateOpt
= log.activeProducers().stream().filter(p -> p.producerId() ==
producerId).findFirst();
+ assertTrue(producerStateOpt.isPresent());
+
+ DescribeProducersResponseData.ProducerState producerState =
producerStateOpt.get();
+ assertEquals(producerEpoch, producerState.producerEpoch());
+ assertEquals(lastSequence, producerState.lastSequence());
+ assertEquals(currentTxnStartOffset.orElse(-1L),
producerState.currentTxnStartOffset());
+ assertEquals(coordinatorEpoch.orElse(-1),
producerState.coordinatorEpoch());
+ }
+
+ @Test
+ public void testFetchUpToLastStableOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ short epoch = 0;
+
+ long producerId1 = 1L;
+ long producerId2 = 2L;
+
+ Consumer<Integer> appendProducer1 =
LogTestUtils.appendTransactionalAsLeader(log, producerId1, epoch, mockTime);
+ Consumer<Integer> appendProducer2 =
LogTestUtils.appendTransactionalAsLeader(log, producerId2, epoch, mockTime);
+
+ appendProducer1.accept(5);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+ appendProducer2.accept(2);
+ appendProducer1.accept(4);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 2);
+ appendProducer1.accept(10);
+
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 5L, 8L,
10L, 14L, 16L, 26L, 27L, 28L));
+
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, epoch,
ControlRecordType.COMMIT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(0L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(8L, log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId2, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(8L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(log.logEndOffset(), log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertLsoBoundedFetches(UnifiedLog log, TreeSet<Long>
batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset <
log.lastStableOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
+
+ for (long offset = log.lastStableOffset(); offset <=
log.logEndOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
+ }
+
+ /**
+ * Tests for time based log roll. This test appends messages then changes
the time
+ * using the mock clock to force the log to roll and checks the number of
segments.
+ */
+ @Test
+ public void testTimeBasedLogRollDuringAppend() throws IOException {
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())));
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+
+ // create a log
+ UnifiedLog log = createLog(logDir, logConfig, 0L, 0L,
brokerTopicStats, mockTime.scheduler, mockTime,
+ new ProducerStateManagerConfig(24 * 60, false), true,
Optional.empty(), false);
+ assertEquals(1, log.numberOfSegments(), "Log begins with a single
empty segment.");
+ // Test the segment rolling behavior when messages do not have a
timestamp.
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(1, log.numberOfSegments(), "Log doesn't roll if doing so
creates an empty segment.");
+
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(2, log.numberOfSegments(), "Log rolls on this append
since time has expired.");
+
+ for (int numSegments = 3; numSegments < 5; numSegments++) {
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(numSegments, log.numberOfSegments(), "Changing time
beyond rollMs and appending should create a new segment.");
+ }
+
+ // Append a message with timestamp to a segment whose first message do
not have a timestamp.
+ long timestamp = mockTime.milliseconds() + log.config().segmentMs + 1;
+ Supplier<MemoryRecords> recordWithTimestamp = () ->
LogTestUtils.records(List.of(new SimpleRecord(timestamp, "test".getBytes())));
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(4, log.numberOfSegments(), "Segment should not have been
rolled out because the log rolling should be based on wall clock.");
+
+ // Test the segment rolling behavior when messages have timestamps.
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(5, log.numberOfSegments(), "A new segment should have
been rolled out");
+
+ // move the wall clock beyond log rolling time
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(5, log.numberOfSegments(), "Log should not roll because
the roll should depend on timestamp of the first message.");
+
+ Supplier<MemoryRecords> recordWithExpiredTimestamp = () ->
LogTestUtils.records(List.of(new SimpleRecord(mockTime.milliseconds(),
"test".getBytes())));
+ log.appendAsLeader(recordWithExpiredTimestamp.get(), 0);
+ assertEquals(6, log.numberOfSegments(), "Log should roll because the
timestamp in the message should make the log segment expire.");
+
+ int numSegments = log.numberOfSegments();
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE), 0);
+ assertEquals(numSegments, log.numberOfSegments(), "Appending an empty
message set should not roll log even if sufficient time has passed.");
+ }
+
+ @Test
+ public void testRollSegmentThatAlreadyExists() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+ int partitionLeaderEpoch = 0;
+
+ // create a log
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "Log begins with a single
empty segment.");
+
+ // roll active segment with the same base offset of size zero should
recreate the segment
+ log.roll(Optional.of(0L));
+ assertEquals(1, log.numberOfSegments(), "Expect 1 segment after roll()
empty segment with base offset.");
+
+ // should be able to append records to active segment
+ MemoryRecords records = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"k1".getBytes(), "v1".getBytes())),
+ 0L, partitionLeaderEpoch);
+ log.appendAsFollower(records, partitionLeaderEpoch);
+ assertEquals(1, log.numberOfSegments(), "Expect one segment.");
+ assertEquals(0L, log.activeSegment().baseOffset());
+
+ // make sure we can append more records
+ MemoryRecords records2 = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds() + 10,
"k2".getBytes(), "v2".getBytes())),
+ 1L, partitionLeaderEpoch);
+ log.appendAsFollower(records2, partitionLeaderEpoch);
+
+ assertEquals(2, log.logEndOffset(), "Expect two records in the log");
+ assertEquals(0, log.read(0, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+ assertEquals(1, log.read(1, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+
+ // roll so that active segment is empty
+ log.roll();
+ assertEquals(2L, log.activeSegment().baseOffset(), "Expect base offset
of active segment to be LEO");
+ assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+
+ // manually resize offset index to force roll of an empty active
segment on next append
+ log.activeSegment().offsetIndex().resize(0);
+ MemoryRecords records3 = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds() + 12,
"k3".getBytes(), "v3".getBytes())),
+ 2L, partitionLeaderEpoch);
+ log.appendAsFollower(records3, partitionLeaderEpoch);
+ assertTrue(log.activeSegment().offsetIndex().maxEntries() > 1);
+ assertEquals(2, log.read(2, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+ assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+ }
+
+ @Test
+ public void testNonSequentialAppend() throws IOException {
+ // create a log
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ long pid = 1L;
+ short epoch = 0;
+
+ MemoryRecords records = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 0, 0L);
+ log.appendAsLeader(records, 0);
+
+ MemoryRecords nextRecords = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 2, 0L);
+ assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(nextRecords, 0));
+ }
+
+ @Test
+ public void testTruncateToEndOffsetClearsEpochCache() throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+ // Seed some initial data in the log
+ MemoryRecords records = LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+ 27, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ appendAsFollower(log, records, 19);
+ assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+
+ // Truncations greater than or equal to the log end offset should
+ // clear the epoch cache
+ verifyTruncationClearsEpochCache(log, 20, log.logEndOffset());
+ verifyTruncationClearsEpochCache(log, 24, log.logEndOffset() + 1);
+ }
+
+ private void verifyTruncationClearsEpochCache(UnifiedLog log, int epoch,
long truncationOffset) {
+ // Simulate becoming a leader
+ log.assignEpochStartOffset(epoch, log.logEndOffset());
+ assertEquals(Optional.of(new EpochEntry(epoch, 29)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+
+ // Now we become the follower and truncate to an offset greater
+ // than or equal to the log end offset. The trivial epoch entry
+ // at the end of the log should be gone
+ log.truncateTo(truncationOffset);
+ assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+ }
+
+ /**
+ * Test the values returned by the logSegments call
+ */
+ @Test
+ public void testLogSegmentsCallCorrect() throws IOException {
+ // Create 3 segments and make sure we get the right values from
various logSegments calls.
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())),
mockTime.milliseconds());
+
+ int setSize = createRecords.get().sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * setSize; // each segment will be 10
messages
+ // create a log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ // segments expire in size
+ for (int i = 1; i <= (2 * msgPerSeg + 2); i++) {
+ log.appendAsLeader(createRecords.get(), 0);
+ }
+ assertEquals(3, log.numberOfSegments(), "There should be exactly 3
segments.");
+
+ // from == to should always be null
+ assertEquals(List.of(), getSegmentOffsets(log, 10, 10));
+ assertEquals(List.of(), getSegmentOffsets(log, 15, 15));
+
+ assertEquals(List.of(0L, 10L, 20L), getSegmentOffsets(log, 0, 21));
+
+ assertEquals(List.of(0L), getSegmentOffsets(log, 1, 5));
+ assertEquals(List.of(10L, 20L), getSegmentOffsets(log, 13, 21));
+ assertEquals(List.of(10L), getSegmentOffsets(log, 13, 17));
+
+ // from > to is bad
+ assertThrows(IllegalArgumentException.class, () -> log.logSegments(10,
0));
+ }
+
+ private List<Long> getSegmentOffsets(UnifiedLog log, long from, long to) {
+ return log.logSegments(from,
to).stream().map(LogSegment::baseOffset).toList();
+ }
+
+ @Test
+ public void testInitializationOfProducerSnapshotsUpgradePath() throws
IOException {
+ // simulate the upgrade path by creating a new log with several
segments, deleting the
+ // snapshot files, and then reloading the log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(64 * 10).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset());
+
+ for (int i = 0; i <= 100; i++) {
+ SimpleRecord record = new SimpleRecord(mockTime.milliseconds(),
String.valueOf(i).getBytes());
+ log.appendAsLeader(LogTestUtils.records(List.of(record)), 0);
+ }
+ assertTrue(log.logSegments().size() >= 2);
+ long logEndOffset = log.logEndOffset();
+ log.close();
+
+ LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+ // Reload after clean shutdown
+ log = createLog(logDir, logConfig, 0L, logEndOffset, brokerTopicStats,
mockTime.scheduler, mockTime,
+ producerStateManagerConfig, true, Optional.empty(), false);
+ List<Long> segmentOffsets = log.logSegments().stream()
+ .map(LogSegment::baseOffset)
+ .toList();
+ int size = segmentOffsets.size();
+ List<Long> expectedSnapshotOffsets = new ArrayList<>(size >= 2 ?
segmentOffsets.subList(size - 2, size) : segmentOffsets);
+ expectedSnapshotOffsets.add(log.logEndOffset());
+ assertEquals(expectedSnapshotOffsets,
LogTestUtils.listProducerSnapshotOffsets(logDir));
+ log.close();
+
+ LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+ // Reload after unclean shutdown with recoveryPoint set to log end
offset
+ log = createLog(logDir, logConfig, 0L, logEndOffset, brokerTopicStats,
mockTime.scheduler, mockTime,
+ producerStateManagerConfig, false, Optional.empty(), false);
+ assertEquals(expectedSnapshotOffsets,
LogTestUtils.listProducerSnapshotOffsets(logDir));
+ log.close();
+
+ LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+ // Reload after unclean shutdown with recoveryPoint set to 0
+ log = createLog(logDir, logConfig, 0L, 0L, brokerTopicStats,
mockTime.scheduler, mockTime,
+ producerStateManagerConfig, false, Optional.empty(), false);
+ // We progressively create a snapshot for each segment after the
recovery point
+ segmentOffsets = log.logSegments().stream()
+ .map(LogSegment::baseOffset)
+ .toList();
+ expectedSnapshotOffsets = new ArrayList<>(segmentOffsets.subList(1,
segmentOffsets.size()));
+ expectedSnapshotOffsets.add(log.logEndOffset());
+ assertEquals(expectedSnapshotOffsets,
LogTestUtils.listProducerSnapshotOffsets(logDir));
+ log.close();
+ }
+
+ @Test
+ public void testLogReinitializeAfterManualDelete() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ // simulate a case where log data does not exist but the start offset
is non-zero
+ UnifiedLog log = createLog(logDir, logConfig, 500L, 0L,
brokerTopicStats, mockTime.scheduler, mockTime,
+ producerStateManagerConfig, true, Optional.empty(), false);
+ assertEquals(500, log.logStartOffset());
+ assertEquals(500, log.logEndOffset());
+ }
+
+ /**
+ * Test that "PeriodicProducerExpirationCheck" scheduled task gets
canceled after log
+ * is deleted.
+ */
+ @Test
+ public void testProducerExpireCheckAfterDelete() throws Exception {
+ KafkaScheduler scheduler = new KafkaScheduler(1);
+ try {
+ scheduler.startup();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ UnifiedLog log = createLog(logDir, logConfig, 0L, 0L,
brokerTopicStats, scheduler, mockTime,
+ producerStateManagerConfig, true, Optional.empty(), false);
+
+ ScheduledFuture<?> producerExpireCheck = log.producerExpireCheck();
+ assertTrue(scheduler.taskRunning(producerExpireCheck),
"producerExpireCheck isn't as part of scheduled tasks");
+
+ log.delete();
+ assertFalse(scheduler.taskRunning(producerExpireCheck),
+ "producerExpireCheck is part of scheduled tasks even after
log deletion");
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ @Test
+ public void testProducerIdMapOffsetUpdatedForNonIdempotentData() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ MemoryRecords records = LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds(), "key".getBytes(), "value".getBytes())));
+ log.appendAsLeader(records, 0);
+ log.takeProducerSnapshot();
+ assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset());
+ }
+
+ @Test
+ public void testRebuildProducerIdMapWithCompactedData() throws IOException
{
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid = 1L;
+ short producerEpoch = 0;
+ int partitionLeaderEpoch = 0;
+ int seq = 0;
+ long baseOffset = 23L;
+
+ // create a batch with a couple gaps to simulate compaction
+ MemoryRecords records = LogTestUtils.records(
+ List.of(
+ new SimpleRecord(mockTime.milliseconds(),
"a".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "b".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"c".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "d".getBytes())
+ ),
+ pid, producerEpoch, seq, baseOffset
+ );
+ records.batches().forEach(b ->
b.setPartitionLeaderEpoch(partitionLeaderEpoch));
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
+ @Override
+ public MemoryRecords.RecordFilter.BatchRetentionResult
checkBatchRetention(RecordBatch batch) {
+ return new
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY,
false);
+ }
+ @Override
+ public boolean shouldRetainRecord(RecordBatch recordBatch, Record
record) {
+ return !record.hasKey();
+ }
+ }, filtered, BufferSupplier.NO_CACHING);
+ filtered.flip();
+ MemoryRecords filteredRecords =
MemoryRecords.readableRecords(filtered);
+
+ log.appendAsFollower(filteredRecords, partitionLeaderEpoch);
+
+ // append some more data and then truncate to force rebuilding of the
PID map
+ MemoryRecords moreRecords = LogTestUtils.records(
+ List.of(
+ new SimpleRecord(mockTime.milliseconds(),
"e".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"f".getBytes())),
+ baseOffset + 4, RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+ appendAsFollower(log, moreRecords, partitionLeaderEpoch);
+
+ log.truncateTo(baseOffset + 4);
+
+ Map<Long, Integer> activeProducers =
log.activeProducersWithLastSequence();
+ assertTrue(activeProducers.containsKey(pid));
+
+ int lastSeq = activeProducers.get(pid);
+ assertEquals(3, lastSeq);
+ }
+
+ @Test
+ public void testRebuildProducerStateWithEmptyCompactedBatch() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid = 1L;
+ short producerEpoch = 0;
+ int partitionLeaderEpoch = 0;
+ int seq = 0;
+ long baseOffset = 23L;
+
+ // create an empty batch
+ MemoryRecords records = LogTestUtils.records(
+ List.of(
+ new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "a".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "b".getBytes())),
+ pid, producerEpoch, seq, baseOffset
+ );
+ records.batches().forEach(b ->
b.setPartitionLeaderEpoch(partitionLeaderEpoch));
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
+ @Override
+ public MemoryRecords.RecordFilter.BatchRetentionResult
checkBatchRetention(RecordBatch batch) {
+ return new
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY,
true);
+ }
+ @Override public boolean shouldRetainRecord(RecordBatch
recordBatch, Record record) {
+ return false;
+ }
+ }, filtered, BufferSupplier.NO_CACHING);
+ filtered.flip();
+ MemoryRecords filteredRecords =
MemoryRecords.readableRecords(filtered);
+
+ log.appendAsFollower(filteredRecords, partitionLeaderEpoch);
+
+ // append some more data and then truncate to force rebuilding of the
PID map
+ MemoryRecords moreRecords = LogTestUtils.records(
+ List.of(
+ new SimpleRecord(mockTime.milliseconds(),
"e".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"f".getBytes())),
+ baseOffset + 2, RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+ appendAsFollower(log, moreRecords, partitionLeaderEpoch);
+
+ log.truncateTo(baseOffset + 2);
+
+ Map<Long, Integer> activeProducers =
log.activeProducersWithLastSequence();
+ assertTrue(activeProducers.containsKey(pid));
+
+ int lastSeq = activeProducers.get(pid);
+ assertEquals(1, lastSeq);
+ }
+
+ @Test
+ public void testUpdateProducerIdMapWithCompactedData() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid = 1L;
+ short producerEpoch = 0;
+ int partitionLeaderEpoch = 0;
+ int seq = 0;
+ long baseOffset = 23L;
+
+ // create a batch with a couple gaps to simulate compaction
+ MemoryRecords records = LogTestUtils.records(
+ List.of(
+ new SimpleRecord(mockTime.milliseconds(),
"a".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "b".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"c".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "d".getBytes())),
+ pid, producerEpoch, seq, baseOffset
+ );
+ records.batches().forEach(b ->
b.setPartitionLeaderEpoch(partitionLeaderEpoch));
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
+ @Override public MemoryRecords.RecordFilter.BatchRetentionResult
checkBatchRetention(RecordBatch batch) {
+ return new
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY,
false);
+ }
+ @Override public boolean shouldRetainRecord(RecordBatch
recordBatch, Record record) {
+ return !record.hasKey();
+ }
+ }, filtered, BufferSupplier.NO_CACHING);
+ filtered.flip();
+ MemoryRecords filteredRecords =
MemoryRecords.readableRecords(filtered);
+
+ log.appendAsFollower(filteredRecords, partitionLeaderEpoch);
+ Map<Long, Integer> activeProducers =
log.activeProducersWithLastSequence();
+ assertTrue(activeProducers.containsKey(pid));
+
+ int lastSeq = activeProducers.get(pid);
+ assertEquals(3, lastSeq);
+ }
+
+ @Test
+ public void testProducerIdMapTruncateTo() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes()))), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("b".getBytes()))), 0);
+ log.takeProducerSnapshot();
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("c".getBytes()))), 0);
+ log.takeProducerSnapshot();
+
+ log.truncateTo(2);
+ assertEquals(OptionalLong.of(2), log.latestProducerSnapshotOffset());
+ assertEquals(2, log.latestProducerStateEndOffset());
+
+ log.truncateTo(1);
+ assertEquals(OptionalLong.of(1), log.latestProducerSnapshotOffset());
+ assertEquals(1, log.latestProducerStateEndOffset());
+
+ log.truncateTo(0);
+ assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset());
+ assertEquals(0, log.latestProducerStateEndOffset());
+ }
+
+ @Test
+ public void testProducerIdMapTruncateToWithNoSnapshots() throws
IOException {
+ // This ensures that the upgrade optimization path cannot be hit after
initial loading
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid = 1L;
+ short epoch = 0;
+
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord("a".getBytes())),
+ pid, epoch, 0, 0L), 0);
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord("b".getBytes())),
+ pid, epoch, 1, 0L), 0);
+
+ LogTestUtils.deleteProducerSnapshotFiles(logDir);
+
+ log.truncateTo(1L);
+ assertEquals(1, log.activeProducersWithLastSequence().size());
+
+ int lastSeq = log.activeProducersWithLastSequence().get(pid);
+ assertEquals(0, lastSeq);
+ }
+
+ @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with
createEmptyActiveSegment: {0}")
+ @ValueSource(booleans = {true, false})
+ public void testRetentionDeletesProducerStateSnapshots(boolean
createEmptyActiveSegment) throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(0)
+ .retentionMs(1000 * 60)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid1 = 1L;
+ short epoch = 0;
+
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord("a".getBytes())),
+ pid1, epoch, 0, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord("b".getBytes())),
+ pid1, epoch, 1, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord("c".getBytes())),
+ pid1, epoch, 2, 0L), 0);
+ if (createEmptyActiveSegment) {
+ log.roll();
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ int numProducerSnapshots = createEmptyActiveSegment ? 3 : 2;
+ assertEquals(numProducerSnapshots,
ProducerStateManager.listSnapshotFiles(logDir).size());
+ // Sleep to breach the retention period
+ mockTime.sleep(1000 * 60 + 1);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ // Sleep to breach the file delete delay and run scheduled file
deletion tasks
+ mockTime.sleep(1);
+ assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+ "expect a single producer state snapshot remaining");
+ assertEquals(3, log.logStartOffset());
+ }
+
+ private void assertValidLogOffsetMetadata(UnifiedLog log,
LogOffsetMetadata offsetMetadata) throws IOException {
+ assertFalse(offsetMetadata.messageOffsetOnly());
+
+ long segmentBaseOffset = offsetMetadata.segmentBaseOffset;
+ List<LogSegment> segments = log.logSegments(segmentBaseOffset,
segmentBaseOffset + 1);
+ assertFalse(segments.isEmpty());
+
+ LogSegment segment = segments.iterator().next();
+ assertEquals(segmentBaseOffset, segment.baseOffset());
+ assertTrue(offsetMetadata.relativePositionInSegment <= segment.size());
+
+ FetchDataInfo readInfo = segment.read(offsetMetadata.messageOffset,
+ 2048,
+ Optional.of((long) segment.size()),
+ false);
+
+ if (offsetMetadata.relativePositionInSegment < segment.size()) {
+ assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata);
+ } else {
+ assertNull(readInfo);
+ }
+ }
+
private void append(int epoch, long startOffset, int count) {
Function<Integer, MemoryRecords> records = i ->
records(List.of(new SimpleRecord("value".getBytes())),
startOffset + i, epoch);
@@ -630,6 +1632,11 @@ public class UnifiedLogTest {
}
}
+ private void appendAsFollower(UnifiedLog log, MemoryRecords records, int
leaderEpoch) {
+ records.batches().forEach(b -> b.setPartitionLeaderEpoch(leaderEpoch));
+ log.appendAsFollower(records, leaderEpoch);
+ }
+
private LeaderEpochFileCache epochCache(UnifiedLog log) {
return log.leaderEpochCache();
}
@@ -639,25 +1646,28 @@ public class UnifiedLogTest {
}
private UnifiedLog createLog(File dir, LogConfig config, boolean
remoteStorageSystemEnable) throws IOException {
- return createLog(dir, config, this.brokerTopicStats,
mockTime.scheduler, this.mockTime,
- this.producerStateManagerConfig, Optional.empty(),
remoteStorageSystemEnable);
+ return createLog(dir, config, 0L, 0L, brokerTopicStats,
mockTime.scheduler, mockTime,
+ producerStateManagerConfig, true, Optional.empty(),
remoteStorageSystemEnable);
}
private UnifiedLog createLog(
File dir,
LogConfig config,
+ long logStartOffset,
+ long recoveryPoint,
BrokerTopicStats brokerTopicStats,
Scheduler scheduler,
MockTime time,
ProducerStateManagerConfig producerStateManagerConfig,
+ boolean lastShutdownClean,
Optional<Uuid> topicId,
boolean remoteStorageSystemEnable) throws IOException {
UnifiedLog log = UnifiedLog.create(
dir,
config,
- 0L,
- 0L,
+ logStartOffset,
+ recoveryPoint,
scheduler,
brokerTopicStats,
time,
@@ -665,14 +1675,14 @@ public class UnifiedLogTest {
producerStateManagerConfig,
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
new LogDirFailureChannel(10),
- true,
+ lastShutdownClean,
topicId,
new ConcurrentHashMap<>(),
remoteStorageSystemEnable,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER
);
- this.logsToClose.add(log);
+ logsToClose.add(log);
return log;
}