This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95dc9d9eede Move IndexEntry and related to storage module (#12993)
95dc9d9eede is described below

commit 95dc9d9eede40deb303c9c2b3365bfb0abdd3330
Author: Ismael Juma <ism...@juma.me.uk>
AuthorDate: Sat Dec 17 10:07:11 2022 -0800

    Move IndexEntry and related to storage module (#12993)
    
    For broader context on this change, please check:
    
    * KAFKA-14470: Move log layer to storage module
    
    Reviewers: dengziming <dengziming1...@gmail.com>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala  |  2 +-
 core/src/main/scala/kafka/log/IndexEntry.scala     | 52 ----------------
 core/src/main/scala/kafka/log/LocalLog.scala       |  3 +-
 core/src/main/scala/kafka/log/LogLoader.scala      |  1 +
 core/src/main/scala/kafka/log/LogSegment.scala     | 15 ++---
 core/src/main/scala/kafka/log/OffsetIndex.scala    |  8 +--
 core/src/main/scala/kafka/log/TimeIndex.scala      | 10 ++--
 .../main/scala/kafka/log/TransactionIndex.scala    |  1 +
 .../scala/unit/kafka/log/OffsetIndexTest.scala     | 40 ++++++-------
 .../test/scala/unit/kafka/log/TimeIndexTest.scala  | 18 +++---
 .../unit/kafka/log/TransactionIndexTest.scala      |  1 +
 .../log/internals/CorruptIndexException.java       | 15 +++--
 .../kafka/server/log/internals/IndexEntry.java     | 14 +++--
 .../kafka/server/log/internals/OffsetPosition.java | 70 ++++++++++++++++++++++
 .../server/log/internals/TimestampOffset.java      | 70 ++++++++++++++++++++++
 15 files changed, 209 insertions(+), 111 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 37cd4b9f55c..1c8032115d8 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -22,11 +22,11 @@ import java.nio.channels.FileChannel
 import java.nio.file.Files
 import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.concurrent.locks.{Lock, ReentrantLock}
-
 import kafka.common.IndexOffsetOverflowException
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, 
Utils}
+import org.apache.kafka.server.log.internals.IndexEntry
 
 /**
  * The abstract index class which holds entry format agnostic methods.
diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala 
b/core/src/main/scala/kafka/log/IndexEntry.scala
deleted file mode 100644
index 705366e32f5..00000000000
--- a/core/src/main/scala/kafka/log/IndexEntry.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import org.apache.kafka.common.requests.ListOffsetsResponse
-
-sealed trait IndexEntry {
-  // We always use Long for both key and value to avoid boxing.
-  def indexKey: Long
-  def indexValue: Long
-}
-
-/**
- * The mapping between a logical log offset and the physical position
- * in some log file of the beginning of the message set entry with the
- * given offset.
- */
-case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
-  override def indexKey = offset
-  override def indexValue = position.toLong
-}
-
-
-/**
- * The mapping between a timestamp to a message offset. The entry means that 
any message whose timestamp is greater
- * than that timestamp must be at or after that offset.
- * @param timestamp The max timestamp before the given offset.
- * @param offset The message offset.
- */
-case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
-  override def indexKey = timestamp
-  override def indexValue = offset
-}
-
-object TimestampOffset {
-  val Unknown = TimestampOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
ListOffsetsResponse.UNKNOWN_OFFSET)
-}
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala 
b/core/src/main/scala/kafka/log/LocalLog.scala
index b0e7b0e446e..56172333351 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, 
OffsetOutOfRangeEx
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.log.internals.OffsetPosition
 
 import scala.jdk.CollectionConverters._
 import scala.collection.{Seq, immutable}
@@ -440,7 +441,7 @@ class LocalLog(@volatile private var _dir: File,
   private def addAbortedTransactions(startOffset: Long, segment: LogSegment,
                                      fetchInfo: FetchDataInfo): FetchDataInfo 
= {
     val fetchSize = fetchInfo.records.sizeInBytes
-    val startOffsetPosition = 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+    val startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
       fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
     val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, 
fetchSize).getOrElse {
       
segments.higherSegment(segment.baseOffset).map(_.baseOffset).getOrElse(logEndOffset)
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala 
b/core/src/main/scala/kafka/log/LogLoader.scala
index f7def8b6053..2eb055ba167 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidOffsetException
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.snapshot.Snapshots
+import org.apache.kafka.server.log.internals.CorruptIndexException
 
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import scala.collection.{Set, mutable}
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index eb9ff4637cc..4da227556f5 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, 
TimestampAndOffset}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.server.log.internals.{OffsetPosition, TimestampOffset}
 
 import scala.jdk.CollectionConverters._
 import scala.math._
@@ -102,10 +103,10 @@ class LogSegment private[log] (val log: FileRecords,
   @volatile private var rollingBasedTimestamp: Option[Long] = None
 
   /* The maximum timestamp and offset we see so far */
-  @volatile private var _maxTimestampAndOffsetSoFar: TimestampOffset = 
TimestampOffset.Unknown
+  @volatile private var _maxTimestampAndOffsetSoFar: TimestampOffset = 
TimestampOffset.UNKNOWN
   def maxTimestampAndOffsetSoFar_= (timestampOffset: TimestampOffset): Unit = 
_maxTimestampAndOffsetSoFar = timestampOffset
   def maxTimestampAndOffsetSoFar: TimestampOffset = {
-    if (_maxTimestampAndOffsetSoFar == TimestampOffset.Unknown)
+    if (_maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN)
       _maxTimestampAndOffsetSoFar = timeIndex.lastEntry
     _maxTimestampAndOffsetSoFar
   }
@@ -161,7 +162,7 @@ class LogSegment private[log] (val log: FileRecords,
       trace(s"Appended $appendedBytes to ${log.file} at end offset 
$largestOffset")
       // Update the in memory max timestamp and corresponding offset.
       if (largestTimestamp > maxTimestampSoFar) {
-        maxTimestampAndOffsetSoFar = TimestampOffset(largestTimestamp, 
shallowOffsetOfMaxTimestamp)
+        maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestamp, 
shallowOffsetOfMaxTimestamp)
       }
       // append an entry to the index (if needed)
       if (bytesSinceLastIndexEntry > indexIntervalBytes) {
@@ -340,7 +341,7 @@ class LogSegment private[log] (val log: FileRecords,
     txnIndex.reset()
     var validBytes = 0
     var lastIndexEntry = 0
-    maxTimestampAndOffsetSoFar = TimestampOffset.Unknown
+    maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN
     try {
       for (batch <- log.batches.asScala) {
         batch.ensureValid()
@@ -348,7 +349,7 @@ class LogSegment private[log] (val log: FileRecords,
 
         // The max timestamp is exposed at the batch level, so no need to 
iterate the records
         if (batch.maxTimestamp > maxTimestampSoFar) {
-          maxTimestampAndOffsetSoFar = TimestampOffset(batch.maxTimestamp, 
batch.lastOffset)
+          maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp, 
batch.lastOffset)
         }
 
         // Build offset index
@@ -393,7 +394,7 @@ class LogSegment private[log] (val log: FileRecords,
     // Scan the rest of the messages to see if there is a larger timestamp 
after the last time index entry.
     val maxTimestampOffsetAfterLastEntry = 
log.largestTimestampAfter(offsetPosition.position)
     if (maxTimestampOffsetAfterLastEntry.timestamp > 
lastTimeIndexEntry.timestamp) {
-      maxTimestampAndOffsetSoFar = 
TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, 
maxTimestampOffsetAfterLastEntry.offset)
+      maxTimestampAndOffsetSoFar = new 
TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, 
maxTimestampOffsetAfterLastEntry.offset)
     }
   }
 
@@ -590,7 +591,7 @@ class LogSegment private[log] (val log: FileRecords,
    * Close this log segment
    */
   def close(): Unit = {
-    if (_maxTimestampAndOffsetSoFar != TimestampOffset.Unknown)
+    if (_maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN)
       CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, 
offsetOfMaxTimestampSoFar,
         skipFullCheck = true), this)
     CoreUtils.swallow(lazyOffsetIndex.close(), this)
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala 
b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 62afbac930e..3b3ea461fbd 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -19,10 +19,10 @@ package kafka.log
 
 import java.io.File
 import java.nio.ByteBuffer
-
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.InvalidOffsetException
+import org.apache.kafka.server.log.internals.{CorruptIndexException, 
OffsetPosition}
 
 /**
  * An index that maps offsets to physical file locations for a particular log 
segment. This index may be sparse:
@@ -68,7 +68,7 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
   private def lastEntry: OffsetPosition = {
     inLock(lock) {
       _entries match {
-        case 0 => OffsetPosition(baseOffset, 0)
+        case 0 => new OffsetPosition(baseOffset, 0)
         case s => parseEntry(mmap, s - 1)
       }
     }
@@ -90,7 +90,7 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
       val idx = mmap.duplicate
       val slot = largestLowerBoundSlotFor(idx, targetOffset, 
IndexSearchType.KEY)
       if(slot == -1)
-        OffsetPosition(baseOffset, 0)
+        new OffsetPosition(baseOffset, 0)
       else
         parseEntry(idx, slot)
     }
@@ -117,7 +117,7 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 
entrySize + 4)
 
   override protected def parseEntry(buffer: ByteBuffer, n: Int): 
OffsetPosition = {
-    OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
+    new OffsetPosition(baseOffset + relativeOffset(buffer, n), 
physical(buffer, n))
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala 
b/core/src/main/scala/kafka/log/TimeIndex.scala
index 2c464d602ff..6b22e2f3eaf 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -19,11 +19,11 @@ package kafka.log
 
 import java.io.File
 import java.nio.ByteBuffer
-
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.InvalidOffsetException
 import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.server.log.internals.{CorruptIndexException, 
TimestampOffset}
 
 /**
  * An index that maps from the timestamp to the logical offsets of the 
messages in a segment. This index might be
@@ -76,7 +76,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: 
Int = -1, writable:
   private def lastEntryFromIndexFile: TimestampOffset = {
     inLock(lock) {
       _entries match {
-        case 0 => TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
+        case 0 => new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
         case s => parseEntry(mmap, s - 1)
       }
     }
@@ -97,7 +97,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: 
Int = -1, writable:
   }
 
   override def parseEntry(buffer: ByteBuffer, n: Int): TimestampOffset = {
-    TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, 
n))
+    new TimestampOffset(timestamp(buffer, n), baseOffset + 
relativeOffset(buffer, n))
   }
 
   /**
@@ -134,7 +134,7 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
         mmap.putLong(timestamp)
         mmap.putInt(relativeOffset(offset))
         _entries += 1
-        _lastEntry = TimestampOffset(timestamp, offset)
+        _lastEntry = new TimestampOffset(timestamp, offset)
         require(_entries * entrySize == mmap.position(), s"${_entries} entries 
but file position in index is ${mmap.position()}.")
       }
     }
@@ -153,7 +153,7 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
       val idx = mmap.duplicate
       val slot = largestLowerBoundSlotFor(idx, targetTimestamp, 
IndexSearchType.KEY)
       if (slot == -1)
-        TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
+        new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
       else
         parseEntry(idx, slot)
     }
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala 
b/core/src/main/scala/kafka/log/TransactionIndex.scala
index d7967e0213a..72321db31ee 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -24,6 +24,7 @@ import kafka.utils.{Logging, nonthreadsafe}
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.log.internals.CorruptIndexException
 
 import scala.collection.mutable.ListBuffer
 
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala 
b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 96cff3f9cd0..d2a64373d75 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -19,16 +19,16 @@ package kafka.log
 
 import java.io._
 import java.nio.file.Files
-
 import org.junit.jupiter.api.Assertions._
-import java.util.{Arrays, Collections}
 
+import java.util.{Arrays, Collections}
 import org.junit.jupiter.api._
 
 import scala.collection._
 import scala.util.Random
 import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.InvalidOffsetException
+import org.apache.kafka.server.log.internals.OffsetPosition
 
 import scala.annotation.nowarn
 
@@ -52,7 +52,7 @@ class OffsetIndexTest {
   @nowarn("cat=deprecation")
   @Test
   def randomLookupTest(): Unit = {
-    assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(92L),
+    assertEquals(new OffsetPosition(idx.baseOffset, 0), idx.lookup(92L),
       "Not present value should return physical offset 0.")
     
     // append some random values
@@ -63,7 +63,7 @@ class OffsetIndexTest {
     
     // should be able to find all those values
     for((logical, physical) <- vals)
-      assertEquals(OffsetPosition(logical, physical), idx.lookup(logical),
+      assertEquals(new OffsetPosition(logical, physical), idx.lookup(logical),
         "Should be able to find values that are present.")
       
     // for non-present values we should find the offset of the largest value 
less than or equal to this 
@@ -73,9 +73,9 @@ class OffsetIndexTest {
     for(offset <- offsets.take(30)) {
       val rightAnswer = 
         if(offset < valMap.firstKey)
-          OffsetPosition(idx.baseOffset, 0)
+          new OffsetPosition(idx.baseOffset, 0)
         else
-          OffsetPosition(valMap.to(offset).last._1, 
valMap.to(offset).last._2._2)
+          new OffsetPosition(valMap.to(offset).last._1, 
valMap.to(offset).last._2._2)
       assertEquals(rightAnswer, idx.lookup(offset),
         "The index should give the same answer as the sorted map")
     }
@@ -83,13 +83,13 @@ class OffsetIndexTest {
   
   @Test
   def lookupExtremeCases(): Unit = {
-    assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset),
+    assertEquals(new OffsetPosition(idx.baseOffset, 0), 
idx.lookup(idx.baseOffset),
       "Lookup on empty file")
     for(i <- 0 until idx.maxEntries)
       idx.append(idx.baseOffset + i + 1, i)
     // check first and last entry
-    assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset))
-    assertEquals(OffsetPosition(idx.baseOffset + idx.maxEntries, 
idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries))
+    assertEquals(new OffsetPosition(idx.baseOffset, 0), 
idx.lookup(idx.baseOffset))
+    assertEquals(new OffsetPosition(idx.baseOffset + idx.maxEntries, 
idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries))
   }
 
   @Test
@@ -97,7 +97,7 @@ class OffsetIndexTest {
     for (i <- 0 until idx.maxEntries)
       idx.append(idx.baseOffset + i + 1, i)
     for (i <- 0 until idx.maxEntries)
-      assertEquals(OffsetPosition(idx.baseOffset + i + 1, i), idx.entry(i))
+      assertEquals(new OffsetPosition(idx.baseOffset + i + 1, i), idx.entry(i))
   }
 
   @Test
@@ -122,10 +122,10 @@ class OffsetIndexTest {
 
   @Test
   def testFetchUpperBoundOffset(): Unit = {
-    val first = OffsetPosition(baseOffset + 0, 0)
-    val second = OffsetPosition(baseOffset + 1, 10)
-    val third = OffsetPosition(baseOffset + 2, 23)
-    val fourth = OffsetPosition(baseOffset + 3, 37)
+    val first = new OffsetPosition(baseOffset + 0, 0)
+    val second = new OffsetPosition(baseOffset + 1, 10)
+    val third = new OffsetPosition(baseOffset + 2, 23)
+    val fourth = new OffsetPosition(baseOffset + 3, 37)
 
     assertEquals(None, idx.fetchUpperBoundOffset(first, 5))
 
@@ -144,8 +144,8 @@ class OffsetIndexTest {
 
   @Test
   def testReopen(): Unit = {
-    val first = OffsetPosition(51, 0)
-    val sec = OffsetPosition(52, 1)
+    val first = new OffsetPosition(51, 0)
+    val sec = new OffsetPosition(52, 1)
     idx.append(first.offset, first.position)
     idx.append(sec.offset, sec.position)
     idx.close()
@@ -166,28 +166,28 @@ class OffsetIndexTest {
       
     // now check the last offset after various truncate points and validate 
that we can still append to the index.      
     idx.truncateTo(12)
-    assertEquals(OffsetPosition(9, 9), idx.lookup(10),
+    assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
       "Index should be unchanged by truncate past the end")
     assertEquals(9, idx.lastOffset,
       "9 should be the last entry in the index")
     
     idx.append(10, 10)
     idx.truncateTo(10)
-    assertEquals(OffsetPosition(9, 9), idx.lookup(10),
+    assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
       "Index should be unchanged by truncate at the end")
     assertEquals(9, idx.lastOffset,
       "9 should be the last entry in the index")
     idx.append(10, 10)
     
     idx.truncateTo(9)
-    assertEquals(OffsetPosition(8, 8), idx.lookup(10),
+    assertEquals(new OffsetPosition(8, 8), idx.lookup(10),
       "Index should truncate off last entry")
     assertEquals(8, idx.lastOffset,
       "8 should be the last entry in the index")
     idx.append(9, 9)
     
     idx.truncateTo(5)
-    assertEquals(OffsetPosition(4, 4), idx.lookup(10),
+    assertEquals(new OffsetPosition(4, 4), idx.lookup(10),
       "4 should be the last entry in the index")
     assertEquals(4, idx.lastOffset,
       "4 should be the last entry in the index")
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala 
b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
index 04d4adfab7c..49ce3f64ba6 100644
--- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -18,9 +18,9 @@
 package kafka.log
 
 import java.io.File
-
 import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.InvalidOffsetException
+import org.apache.kafka.server.log.internals.{CorruptIndexException, 
TimestampOffset}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 
@@ -46,26 +46,26 @@ class TimeIndexTest {
   @Test
   def testLookUp(): Unit = {
     // Empty time index
-    assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(100L))
+    assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(100L))
 
     // Add several time index entries.
     appendEntries(maxEntries - 1)
 
     // look for timestamp smaller than the earliest entry
-    assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(9))
+    assertEquals(new TimestampOffset(-1L, baseOffset), idx.lookup(9))
     // look for timestamp in the middle of two entries.
-    assertEquals(TimestampOffset(20L, 65L), idx.lookup(25))
+    assertEquals(new TimestampOffset(20L, 65L), idx.lookup(25))
     // look for timestamp same as the one in the entry
-    assertEquals(TimestampOffset(30L, 75L), idx.lookup(30))
+    assertEquals(new TimestampOffset(30L, 75L), idx.lookup(30))
   }
 
   @Test
   def testEntry(): Unit = {
     appendEntries(maxEntries - 1)
-    assertEquals(TimestampOffset(10L, 55L), idx.entry(0))
-    assertEquals(TimestampOffset(20L, 65L), idx.entry(1))
-    assertEquals(TimestampOffset(30L, 75L), idx.entry(2))
-    assertEquals(TimestampOffset(40L, 85L), idx.entry(3))
+    assertEquals(new TimestampOffset(10L, 55L), idx.entry(0))
+    assertEquals(new TimestampOffset(20L, 65L), idx.entry(1))
+    assertEquals(new TimestampOffset(30L, 75L), idx.entry(2))
+    assertEquals(new TimestampOffset(40L, 85L), idx.entry(3))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala 
b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 1f0f91aa2ad..9c1cbe3f307 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -18,6 +18,7 @@ package kafka.log
 
 import kafka.utils.TestUtils
 import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.server.log.internals.CorruptIndexException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
diff --git a/core/src/main/scala/kafka/log/CorruptIndexException.scala 
b/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptIndexException.java
similarity index 67%
copy from core/src/main/scala/kafka/log/CorruptIndexException.scala
copy to 
storage/src/main/java/org/apache/kafka/server/log/internals/CorruptIndexException.java
index b39ee5b3bbb..cf3880598a2 100644
--- a/core/src/main/scala/kafka/log/CorruptIndexException.scala
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptIndexException.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,7 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.server.log.internals;
 
-package kafka.log
-
-class CorruptIndexException(message: String) extends RuntimeException(message)
+public class CorruptIndexException extends RuntimeException {
+    public CorruptIndexException(String message) {
+        super(message);
+    }
+}
diff --git a/core/src/main/scala/kafka/log/CorruptIndexException.scala 
b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexEntry.java
similarity index 72%
rename from core/src/main/scala/kafka/log/CorruptIndexException.scala
rename to 
storage/src/main/java/org/apache/kafka/server/log/internals/IndexEntry.java
index b39ee5b3bbb..30b42ed4389 100644
--- a/core/src/main/scala/kafka/log/CorruptIndexException.scala
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexEntry.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.server.log.internals;
 
-package kafka.log
-
-class CorruptIndexException(message: String) extends RuntimeException(message)
+public interface IndexEntry {
+    long indexKey();
+    long indexValue();
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetPosition.java
 
b/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetPosition.java
new file mode 100644
index 00000000000..7fa017c612a
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetPosition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+/**
+ * The mapping between a logical log offset and the physical position
+ * in some log file of the beginning of the message set entry with the
+ * given offset.
+ */
+public final class OffsetPosition implements IndexEntry {
+    public final long offset;
+    public final int position;
+
+    public OffsetPosition(long offset, int position) {
+        this.offset = offset;
+        this.position = position;
+    }
+
+    @Override
+    public long indexKey() {
+        return offset;
+    }
+
+    @Override
+    public long indexValue() {
+        return position;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        OffsetPosition that = (OffsetPosition) o;
+
+        return offset == that.offset
+            && position == that.position;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Long.hashCode(offset);
+        result = 31 * result + position;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "OffsetPosition(" +
+            "offset=" + offset +
+            ", position=" + position +
+            ')';
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/internals/TimestampOffset.java
 
b/storage/src/main/java/org/apache/kafka/server/log/internals/TimestampOffset.java
new file mode 100644
index 00000000000..77b1bbe69a0
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/internals/TimestampOffset.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+/**
+ * The mapping between a timestamp to a message offset. The entry means that 
any message whose timestamp is greater
+ * than that timestamp must be at or after that offset.
+ */
+public class TimestampOffset implements IndexEntry {
+
+    public static final TimestampOffset UNKNOWN = new TimestampOffset(-1, -1);
+
+    public final long timestamp;
+    public final long offset;
+
+    /**
+     * Create a TimestampOffset with the provided parameters.
+     *
+     * @param timestamp The max timestamp before the given offset.
+     * @param offset The message offset.
+     */
+    public TimestampOffset(long timestamp, long offset) {
+        this.timestamp = timestamp;
+        this.offset = offset;
+    }
+
+    @Override
+    public long indexKey() {
+        return timestamp;
+    }
+
+    @Override
+    public long indexValue() {
+        return offset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        TimestampOffset that = (TimestampOffset) o;
+
+        return timestamp == that.timestamp
+            && offset == that.offset;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Long.hashCode(timestamp);
+        result = 31 * result + Long.hashCode(offset);
+        return result;
+    }
+}

Reply via email to