This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 95dc9d9eede Move IndexEntry and related to storage module (#12993) 95dc9d9eede is described below commit 95dc9d9eede40deb303c9c2b3365bfb0abdd3330 Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Sat Dec 17 10:07:11 2022 -0800 Move IndexEntry and related to storage module (#12993) For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: dengziming <dengziming1...@gmail.com> --- core/src/main/scala/kafka/log/AbstractIndex.scala | 2 +- core/src/main/scala/kafka/log/IndexEntry.scala | 52 ---------------- core/src/main/scala/kafka/log/LocalLog.scala | 3 +- core/src/main/scala/kafka/log/LogLoader.scala | 1 + core/src/main/scala/kafka/log/LogSegment.scala | 15 ++--- core/src/main/scala/kafka/log/OffsetIndex.scala | 8 +-- core/src/main/scala/kafka/log/TimeIndex.scala | 10 ++-- .../main/scala/kafka/log/TransactionIndex.scala | 1 + .../scala/unit/kafka/log/OffsetIndexTest.scala | 40 ++++++------- .../test/scala/unit/kafka/log/TimeIndexTest.scala | 18 +++--- .../unit/kafka/log/TransactionIndexTest.scala | 1 + .../log/internals/CorruptIndexException.java | 15 +++-- .../kafka/server/log/internals/IndexEntry.java | 14 +++-- .../kafka/server/log/internals/OffsetPosition.java | 70 ++++++++++++++++++++++ .../server/log/internals/TimestampOffset.java | 70 ++++++++++++++++++++++ 15 files changed, 209 insertions(+), 111 deletions(-) diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 37cd4b9f55c..1c8032115d8 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -22,11 +22,11 @@ import java.nio.channels.FileChannel import java.nio.file.Files import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.concurrent.locks.{Lock, ReentrantLock} - import kafka.common.IndexOffsetOverflowException import kafka.utils.CoreUtils.inLock import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils} +import org.apache.kafka.server.log.internals.IndexEntry /** * The abstract index class which holds entry format agnostic methods. diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala b/core/src/main/scala/kafka/log/IndexEntry.scala deleted file mode 100644 index 705366e32f5..00000000000 --- a/core/src/main/scala/kafka/log/IndexEntry.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import org.apache.kafka.common.requests.ListOffsetsResponse - -sealed trait IndexEntry { - // We always use Long for both key and value to avoid boxing. - def indexKey: Long - def indexValue: Long -} - -/** - * The mapping between a logical log offset and the physical position - * in some log file of the beginning of the message set entry with the - * given offset. - */ -case class OffsetPosition(offset: Long, position: Int) extends IndexEntry { - override def indexKey = offset - override def indexValue = position.toLong -} - - -/** - * The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater - * than that timestamp must be at or after that offset. - * @param timestamp The max timestamp before the given offset. - * @param offset The message offset. - */ -case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry { - override def indexKey = timestamp - override def indexValue = offset -} - -object TimestampOffset { - val Unknown = TimestampOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, ListOffsetsResponse.UNKNOWN_OFFSET) -} diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index b0e7b0e446e..56172333351 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.log.internals.OffsetPosition import scala.jdk.CollectionConverters._ import scala.collection.{Seq, immutable} @@ -440,7 +441,7 @@ class LocalLog(@volatile private var _dir: File, private def addAbortedTransactions(startOffset: Long, segment: LogSegment, fetchInfo: FetchDataInfo): FetchDataInfo = { val fetchSize = fetchInfo.records.sizeInBytes - val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, + val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { segments.higherSegment(segment.baseOffset).map(_.baseOffset).getOrElse(logEndOffset) diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index f7def8b6053..2eb055ba167 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidOffsetException import org.apache.kafka.common.utils.Time import org.apache.kafka.snapshot.Snapshots +import org.apache.kafka.server.log.internals.CorruptIndexException import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.collection.{Set, mutable} diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index eb9ff4637cc..4da227556f5 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} +import org.apache.kafka.server.log.internals.{OffsetPosition, TimestampOffset} import scala.jdk.CollectionConverters._ import scala.math._ @@ -102,10 +103,10 @@ class LogSegment private[log] (val log: FileRecords, @volatile private var rollingBasedTimestamp: Option[Long] = None /* The maximum timestamp and offset we see so far */ - @volatile private var _maxTimestampAndOffsetSoFar: TimestampOffset = TimestampOffset.Unknown + @volatile private var _maxTimestampAndOffsetSoFar: TimestampOffset = TimestampOffset.UNKNOWN def maxTimestampAndOffsetSoFar_= (timestampOffset: TimestampOffset): Unit = _maxTimestampAndOffsetSoFar = timestampOffset def maxTimestampAndOffsetSoFar: TimestampOffset = { - if (_maxTimestampAndOffsetSoFar == TimestampOffset.Unknown) + if (_maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) _maxTimestampAndOffsetSoFar = timeIndex.lastEntry _maxTimestampAndOffsetSoFar } @@ -161,7 +162,7 @@ class LogSegment private[log] (val log: FileRecords, trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset") // Update the in memory max timestamp and corresponding offset. if (largestTimestamp > maxTimestampSoFar) { - maxTimestampAndOffsetSoFar = TimestampOffset(largestTimestamp, shallowOffsetOfMaxTimestamp) + maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestamp, shallowOffsetOfMaxTimestamp) } // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { @@ -340,7 +341,7 @@ class LogSegment private[log] (val log: FileRecords, txnIndex.reset() var validBytes = 0 var lastIndexEntry = 0 - maxTimestampAndOffsetSoFar = TimestampOffset.Unknown + maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN try { for (batch <- log.batches.asScala) { batch.ensureValid() @@ -348,7 +349,7 @@ class LogSegment private[log] (val log: FileRecords, // The max timestamp is exposed at the batch level, so no need to iterate the records if (batch.maxTimestamp > maxTimestampSoFar) { - maxTimestampAndOffsetSoFar = TimestampOffset(batch.maxTimestamp, batch.lastOffset) + maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp, batch.lastOffset) } // Build offset index @@ -393,7 +394,7 @@ class LogSegment private[log] (val log: FileRecords, // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry. val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position) if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) { - maxTimestampAndOffsetSoFar = TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, maxTimestampOffsetAfterLastEntry.offset) + maxTimestampAndOffsetSoFar = new TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, maxTimestampOffsetAfterLastEntry.offset) } } @@ -590,7 +591,7 @@ class LogSegment private[log] (val log: FileRecords, * Close this log segment */ def close(): Unit = { - if (_maxTimestampAndOffsetSoFar != TimestampOffset.Unknown) + if (_maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true), this) CoreUtils.swallow(lazyOffsetIndex.close(), this) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 62afbac930e..3b3ea461fbd 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -19,10 +19,10 @@ package kafka.log import java.io.File import java.nio.ByteBuffer - import kafka.utils.CoreUtils.inLock import kafka.utils.Logging import org.apache.kafka.common.errors.InvalidOffsetException +import org.apache.kafka.server.log.internals.{CorruptIndexException, OffsetPosition} /** * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: @@ -68,7 +68,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl private def lastEntry: OffsetPosition = { inLock(lock) { _entries match { - case 0 => OffsetPosition(baseOffset, 0) + case 0 => new OffsetPosition(baseOffset, 0) case s => parseEntry(mmap, s - 1) } } @@ -90,7 +90,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl val idx = mmap.duplicate val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) if(slot == -1) - OffsetPosition(baseOffset, 0) + new OffsetPosition(baseOffset, 0) else parseEntry(idx, slot) } @@ -117,7 +117,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4) override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = { - OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) + new OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) } /** diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index 2c464d602ff..6b22e2f3eaf 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -19,11 +19,11 @@ package kafka.log import java.io.File import java.nio.ByteBuffer - import kafka.utils.CoreUtils.inLock import kafka.utils.Logging import org.apache.kafka.common.errors.InvalidOffsetException import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.server.log.internals.{CorruptIndexException, TimestampOffset} /** * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be @@ -76,7 +76,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: private def lastEntryFromIndexFile: TimestampOffset = { inLock(lock) { _entries match { - case 0 => TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) + case 0 => new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) case s => parseEntry(mmap, s - 1) } } @@ -97,7 +97,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: } override def parseEntry(buffer: ByteBuffer, n: Int): TimestampOffset = { - TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n)) + new TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n)) } /** @@ -134,7 +134,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: mmap.putLong(timestamp) mmap.putInt(relativeOffset(offset)) _entries += 1 - _lastEntry = TimestampOffset(timestamp, offset) + _lastEntry = new TimestampOffset(timestamp, offset) require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.") } } @@ -153,7 +153,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: val idx = mmap.duplicate val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY) if (slot == -1) - TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) + new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) else parseEntry(idx, slot) } diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index d7967e0213a..72321db31ee 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -24,6 +24,7 @@ import kafka.utils.{Logging, nonthreadsafe} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.internals.CorruptIndexException import scala.collection.mutable.ListBuffer diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 96cff3f9cd0..d2a64373d75 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -19,16 +19,16 @@ package kafka.log import java.io._ import java.nio.file.Files - import org.junit.jupiter.api.Assertions._ -import java.util.{Arrays, Collections} +import java.util.{Arrays, Collections} import org.junit.jupiter.api._ import scala.collection._ import scala.util.Random import kafka.utils.TestUtils import org.apache.kafka.common.errors.InvalidOffsetException +import org.apache.kafka.server.log.internals.OffsetPosition import scala.annotation.nowarn @@ -52,7 +52,7 @@ class OffsetIndexTest { @nowarn("cat=deprecation") @Test def randomLookupTest(): Unit = { - assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(92L), + assertEquals(new OffsetPosition(idx.baseOffset, 0), idx.lookup(92L), "Not present value should return physical offset 0.") // append some random values @@ -63,7 +63,7 @@ class OffsetIndexTest { // should be able to find all those values for((logical, physical) <- vals) - assertEquals(OffsetPosition(logical, physical), idx.lookup(logical), + assertEquals(new OffsetPosition(logical, physical), idx.lookup(logical), "Should be able to find values that are present.") // for non-present values we should find the offset of the largest value less than or equal to this @@ -73,9 +73,9 @@ class OffsetIndexTest { for(offset <- offsets.take(30)) { val rightAnswer = if(offset < valMap.firstKey) - OffsetPosition(idx.baseOffset, 0) + new OffsetPosition(idx.baseOffset, 0) else - OffsetPosition(valMap.to(offset).last._1, valMap.to(offset).last._2._2) + new OffsetPosition(valMap.to(offset).last._1, valMap.to(offset).last._2._2) assertEquals(rightAnswer, idx.lookup(offset), "The index should give the same answer as the sorted map") } @@ -83,13 +83,13 @@ class OffsetIndexTest { @Test def lookupExtremeCases(): Unit = { - assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset), + assertEquals(new OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset), "Lookup on empty file") for(i <- 0 until idx.maxEntries) idx.append(idx.baseOffset + i + 1, i) // check first and last entry - assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset)) - assertEquals(OffsetPosition(idx.baseOffset + idx.maxEntries, idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries)) + assertEquals(new OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset)) + assertEquals(new OffsetPosition(idx.baseOffset + idx.maxEntries, idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries)) } @Test @@ -97,7 +97,7 @@ class OffsetIndexTest { for (i <- 0 until idx.maxEntries) idx.append(idx.baseOffset + i + 1, i) for (i <- 0 until idx.maxEntries) - assertEquals(OffsetPosition(idx.baseOffset + i + 1, i), idx.entry(i)) + assertEquals(new OffsetPosition(idx.baseOffset + i + 1, i), idx.entry(i)) } @Test @@ -122,10 +122,10 @@ class OffsetIndexTest { @Test def testFetchUpperBoundOffset(): Unit = { - val first = OffsetPosition(baseOffset + 0, 0) - val second = OffsetPosition(baseOffset + 1, 10) - val third = OffsetPosition(baseOffset + 2, 23) - val fourth = OffsetPosition(baseOffset + 3, 37) + val first = new OffsetPosition(baseOffset + 0, 0) + val second = new OffsetPosition(baseOffset + 1, 10) + val third = new OffsetPosition(baseOffset + 2, 23) + val fourth = new OffsetPosition(baseOffset + 3, 37) assertEquals(None, idx.fetchUpperBoundOffset(first, 5)) @@ -144,8 +144,8 @@ class OffsetIndexTest { @Test def testReopen(): Unit = { - val first = OffsetPosition(51, 0) - val sec = OffsetPosition(52, 1) + val first = new OffsetPosition(51, 0) + val sec = new OffsetPosition(52, 1) idx.append(first.offset, first.position) idx.append(sec.offset, sec.position) idx.close() @@ -166,28 +166,28 @@ class OffsetIndexTest { // now check the last offset after various truncate points and validate that we can still append to the index. idx.truncateTo(12) - assertEquals(OffsetPosition(9, 9), idx.lookup(10), + assertEquals(new OffsetPosition(9, 9), idx.lookup(10), "Index should be unchanged by truncate past the end") assertEquals(9, idx.lastOffset, "9 should be the last entry in the index") idx.append(10, 10) idx.truncateTo(10) - assertEquals(OffsetPosition(9, 9), idx.lookup(10), + assertEquals(new OffsetPosition(9, 9), idx.lookup(10), "Index should be unchanged by truncate at the end") assertEquals(9, idx.lastOffset, "9 should be the last entry in the index") idx.append(10, 10) idx.truncateTo(9) - assertEquals(OffsetPosition(8, 8), idx.lookup(10), + assertEquals(new OffsetPosition(8, 8), idx.lookup(10), "Index should truncate off last entry") assertEquals(8, idx.lastOffset, "8 should be the last entry in the index") idx.append(9, 9) idx.truncateTo(5) - assertEquals(OffsetPosition(4, 4), idx.lookup(10), + assertEquals(new OffsetPosition(4, 4), idx.lookup(10), "4 should be the last entry in the index") assertEquals(4, idx.lastOffset, "4 should be the last entry in the index") diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala index 04d4adfab7c..49ce3f64ba6 100644 --- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala @@ -18,9 +18,9 @@ package kafka.log import java.io.File - import kafka.utils.TestUtils import org.apache.kafka.common.errors.InvalidOffsetException +import org.apache.kafka.server.log.internals.{CorruptIndexException, TimestampOffset} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} @@ -46,26 +46,26 @@ class TimeIndexTest { @Test def testLookUp(): Unit = { // Empty time index - assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(100L)) + assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(100L)) // Add several time index entries. appendEntries(maxEntries - 1) // look for timestamp smaller than the earliest entry - assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(9)) + assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(9)) // look for timestamp in the middle of two entries. - assertEquals(TimestampOffset(20L, 65L), idx.lookup(25)) + assertEquals(new TimestampOffset(20L, 65L), idx.lookup(25)) // look for timestamp same as the one in the entry - assertEquals(TimestampOffset(30L, 75L), idx.lookup(30)) + assertEquals(new TimestampOffset(30L, 75L), idx.lookup(30)) } @Test def testEntry(): Unit = { appendEntries(maxEntries - 1) - assertEquals(TimestampOffset(10L, 55L), idx.entry(0)) - assertEquals(TimestampOffset(20L, 65L), idx.entry(1)) - assertEquals(TimestampOffset(30L, 75L), idx.entry(2)) - assertEquals(TimestampOffset(40L, 85L), idx.entry(3)) + assertEquals(new TimestampOffset(10L, 55L), idx.entry(0)) + assertEquals(new TimestampOffset(20L, 65L), idx.entry(1)) + assertEquals(new TimestampOffset(30L, 75L), idx.entry(2)) + assertEquals(new TimestampOffset(40L, 85L), idx.entry(3)) } @Test diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala index 1f0f91aa2ad..9c1cbe3f307 100644 --- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala @@ -18,6 +18,7 @@ package kafka.log import kafka.utils.TestUtils import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.server.log.internals.CorruptIndexException import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/main/scala/kafka/log/CorruptIndexException.scala b/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptIndexException.java similarity index 67% copy from core/src/main/scala/kafka/log/CorruptIndexException.scala copy to storage/src/main/java/org/apache/kafka/server/log/internals/CorruptIndexException.java index b39ee5b3bbb..cf3880598a2 100644 --- a/core/src/main/scala/kafka/log/CorruptIndexException.scala +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptIndexException.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,7 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.kafka.server.log.internals; -package kafka.log - -class CorruptIndexException(message: String) extends RuntimeException(message) +public class CorruptIndexException extends RuntimeException { + public CorruptIndexException(String message) { + super(message); + } +} diff --git a/core/src/main/scala/kafka/log/CorruptIndexException.scala b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexEntry.java similarity index 72% rename from core/src/main/scala/kafka/log/CorruptIndexException.scala rename to storage/src/main/java/org/apache/kafka/server/log/internals/IndexEntry.java index b39ee5b3bbb..30b42ed4389 100644 --- a/core/src/main/scala/kafka/log/CorruptIndexException.scala +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexEntry.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,7 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.kafka.server.log.internals; -package kafka.log - -class CorruptIndexException(message: String) extends RuntimeException(message) +public interface IndexEntry { + long indexKey(); + long indexValue(); +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetPosition.java b/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetPosition.java new file mode 100644 index 00000000000..7fa017c612a --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetPosition.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +/** + * The mapping between a logical log offset and the physical position + * in some log file of the beginning of the message set entry with the + * given offset. + */ +public final class OffsetPosition implements IndexEntry { + public final long offset; + public final int position; + + public OffsetPosition(long offset, int position) { + this.offset = offset; + this.position = position; + } + + @Override + public long indexKey() { + return offset; + } + + @Override + public long indexValue() { + return position; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + OffsetPosition that = (OffsetPosition) o; + + return offset == that.offset + && position == that.position; + } + + @Override + public int hashCode() { + int result = Long.hashCode(offset); + result = 31 * result + position; + return result; + } + + @Override + public String toString() { + return "OffsetPosition(" + + "offset=" + offset + + ", position=" + position + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TimestampOffset.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TimestampOffset.java new file mode 100644 index 00000000000..77b1bbe69a0 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TimestampOffset.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +/** + * The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater + * than that timestamp must be at or after that offset. + */ +public class TimestampOffset implements IndexEntry { + + public static final TimestampOffset UNKNOWN = new TimestampOffset(-1, -1); + + public final long timestamp; + public final long offset; + + /** + * Create a TimestampOffset with the provided parameters. + * + * @param timestamp The max timestamp before the given offset. + * @param offset The message offset. + */ + public TimestampOffset(long timestamp, long offset) { + this.timestamp = timestamp; + this.offset = offset; + } + + @Override + public long indexKey() { + return timestamp; + } + + @Override + public long indexValue() { + return offset; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TimestampOffset that = (TimestampOffset) o; + + return timestamp == that.timestamp + && offset == that.offset; + } + + @Override + public int hashCode() { + int result = Long.hashCode(timestamp); + result = 31 * result + Long.hashCode(offset); + return result; + } +}