This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e2678d57d09 KAFKA-14472: Move TransactionIndex and related to storage module (#12996) e2678d57d09 is described below commit e2678d57d0919aaa97effe2ee7591cd6b85b5303 Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Mon Dec 19 11:31:37 2022 -0800 KAFKA-14472: Move TransactionIndex and related to storage module (#12996) For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: Jun Rao <jun...@gmail.com>, Satish Duggana <sati...@apache.org> --- core/src/main/scala/kafka/log/LocalLog.scala | 10 +- core/src/main/scala/kafka/log/LogCleaner.scala | 3 +- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- .../scala/kafka/log/ProducerStateManager.scala | 3 +- .../main/scala/kafka/log/TransactionIndex.scala | 264 --------------------- core/src/main/scala/kafka/log/UnifiedLog.scala | 21 +- .../scala/kafka/log/remote/RemoteIndexCache.scala | 2 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 3 +- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 5 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 1 + .../test/scala/unit/kafka/log/LogSegmentTest.scala | 4 +- .../test/scala/unit/kafka/log/LogTestUtils.scala | 6 +- .../unit/kafka/log/ProducerStateManagerTest.scala | 9 +- .../unit/kafka/log/TransactionIndexTest.scala | 80 ++++--- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 1 + .../kafka/server/log/internals/AbortedTxn.java | 117 +++++++++ .../kafka/server/log/internals/CompletedTxn.java | 75 ++++++ .../server/log/internals/TransactionIndex.java | 264 +++++++++++++++++++++ .../server/log/internals/TxnIndexSearchResult.java | 30 +++ 19 files changed, 556 insertions(+), 344 deletions(-) diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 56172333351..68bb9d9f8b0 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.server.log.internals.OffsetPosition +import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition} import scala.jdk.CollectionConverters._ import scala.collection.{Seq, immutable} @@ -448,7 +448,7 @@ class LocalLog(@volatile private var _dir: File, } val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction] - def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) + def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator) FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, @@ -459,13 +459,13 @@ class LocalLog(@volatile private var _dir: File, private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, startingSegment: LogSegment, - accumulator: List[AbortedTxn] => Unit): Unit = { + accumulator: Seq[AbortedTxn] => Unit): Unit = { val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator var segmentEntryOpt = Option(startingSegment) while (segmentEntryOpt.isDefined) { val segment = segmentEntryOpt.get val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset) - accumulator(searchResult.abortedTransactions) + accumulator(searchResult.abortedTransactions.asScala) if (searchResult.isComplete) return segmentEntryOpt = nextOption(higherSegments) @@ -475,7 +475,7 @@ class LocalLog(@volatile private var _dir: File, private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { val segmentEntry = segments.floorSegment(baseOffset) val allAbortedTxns = ListBuffer.empty[AbortedTxn] - def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns + def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator)) allAbortedTxns.toList } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 4ad0ea2d853..8bafc0aae60 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} +import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer @@ -1123,7 +1124,7 @@ private[log] class CleanedTransactionMetadata { private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] // Minheap of aborted transactions sorted by the transaction first offset private val abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] { - override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset + override def compare(x: AbortedTxn, y: AbortedTxn): Int = java.lang.Long.compare(x.firstOffset, y.firstOffset) }.reverse) // Output cleaned index to write retained aborted transactions diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4da227556f5..0c2a013e11f 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{OffsetPosition, TimestampOffset} +import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult} import scala.jdk.CollectionConverters._ import scala.math._ diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 835b74066b6..7307bed0efa 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils} +import org.apache.kafka.server.log.internals.CompletedTxn import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer @@ -318,7 +319,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, // without any associated data will not have any impact on the last stable offset // and would not need to be reflected in the transaction index. val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset => - CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) + new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) } updatedEntry.maybeUpdateProducerEpoch(producerEpoch) diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala deleted file mode 100644 index 72321db31ee..00000000000 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log - -import java.io.{Closeable, File, IOException} -import java.nio.ByteBuffer -import java.nio.channels.FileChannel -import java.nio.file.{Files, StandardOpenOption} -import kafka.utils.{Logging, nonthreadsafe} -import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.message.FetchResponseData -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.CorruptIndexException - -import scala.collection.mutable.ListBuffer - -private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTxn], isComplete: Boolean) - -/** - * The transaction index maintains metadata about the aborted transactions for each segment. This includes - * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of - * the abort. This index is used to find the aborted transactions in the range of a given fetch request at - * the READ_COMMITTED isolation level. - * - * There is at most one transaction index for each log segment. The entries correspond to the transactions - * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions - * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in - * order to find the start of the transactions. - */ -@nonthreadsafe -class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Closeable with Logging { - - // note that the file is not created until we need it - @volatile private var maybeChannel: Option[FileChannel] = None - private var lastOffset: Option[Long] = None - - if (_file.exists) - openChannel() - - def append(abortedTxn: AbortedTxn): Unit = { - lastOffset.foreach { offset => - if (offset >= abortedTxn.lastOffset) - throw new IllegalArgumentException(s"The last offset of appended transactions must increase sequentially, but " + - s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}") - } - lastOffset = Some(abortedTxn.lastOffset) - Utils.writeFully(channel(), abortedTxn.buffer.duplicate()) - } - - def flush(): Unit = maybeChannel.foreach(_.force(true)) - - def file: File = _file - - def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) - - /** - * Delete this index. - * - * @throws IOException if deletion fails due to an I/O error - * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did - * not exist - */ - def deleteIfExists(): Boolean = { - close() - Files.deleteIfExists(file.toPath) - } - - private def channel(): FileChannel = { - maybeChannel match { - case Some(channel) => channel - case None => openChannel() - } - } - - private def openChannel(): FileChannel = { - val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ, - StandardOpenOption.WRITE) - maybeChannel = Some(channel) - channel.position(channel.size) - channel - } - - /** - * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time. - */ - def reset(): Unit = { - maybeChannel.foreach(_.truncate(0)) - lastOffset = None - } - - def close(): Unit = { - maybeChannel.foreach(_.close()) - maybeChannel = None - } - - def renameTo(f: File): Unit = { - try { - if (file.exists) - Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) - } finally _file = f - } - - def truncateTo(offset: Long): Unit = { - val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize) - var newLastOffset: Option[Long] = None - for ((abortedTxn, position) <- iterator(() => buffer)) { - if (abortedTxn.lastOffset >= offset) { - channel().truncate(position) - lastOffset = newLastOffset - return - } - newLastOffset = Some(abortedTxn.lastOffset) - } - } - - private def iterator(allocate: () => ByteBuffer = () => ByteBuffer.allocate(AbortedTxn.TotalSize)): Iterator[(AbortedTxn, Int)] = { - maybeChannel match { - case None => Iterator.empty - case Some(channel) => - var position = 0 - - new Iterator[(AbortedTxn, Int)] { - override def hasNext: Boolean = channel.position - position >= AbortedTxn.TotalSize - - override def next(): (AbortedTxn, Int) = { - try { - val buffer = allocate() - Utils.readFully(channel, buffer, position) - buffer.flip() - - val abortedTxn = new AbortedTxn(buffer) - if (abortedTxn.version > AbortedTxn.CurrentVersion) - throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version} " + - s"in transaction index ${file.getAbsolutePath}, current version is ${AbortedTxn.CurrentVersion}") - val nextEntry = (abortedTxn, position) - position += AbortedTxn.TotalSize - nextEntry - } catch { - case e: IOException => - // We received an unexpected error reading from the index file. We propagate this as an - // UNKNOWN error to the consumer, which will cause it to retry the fetch. - throw new KafkaException(s"Failed to read from the transaction index ${file.getAbsolutePath}", e) - } - } - } - } - } - - def allAbortedTxns: List[AbortedTxn] = { - iterator().map(_._1).toList - } - - /** - * Collect all aborted transactions which overlap with a given fetch range. - * - * @param fetchOffset Inclusive first offset of the fetch range - * @param upperBoundOffset Exclusive last offset in the fetch range - * @return An object containing the aborted transactions and whether the search needs to continue - * into the next log segment. - */ - def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = { - val abortedTransactions = ListBuffer.empty[AbortedTxn] - for ((abortedTxn, _) <- iterator()) { - if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset) - abortedTransactions += abortedTxn - - if (abortedTxn.lastStableOffset >= upperBoundOffset) - return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true) - } - TxnIndexSearchResult(abortedTransactions.toList, isComplete = false) - } - - /** - * Do a basic sanity check on this index to detect obvious problems. - * - * @throws CorruptIndexException if any problems are found. - */ - def sanityCheck(): Unit = { - val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize) - for ((abortedTxn, _) <- iterator(() => buffer)) { - if (abortedTxn.lastOffset < startOffset) - throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn in index " + - s"${file.getAbsolutePath} is less than start offset $startOffset") - } - } - -} - -private[log] object AbortedTxn { - val VersionOffset = 0 - val VersionSize = 2 - val ProducerIdOffset = VersionOffset + VersionSize - val ProducerIdSize = 8 - val FirstOffsetOffset = ProducerIdOffset + ProducerIdSize - val FirstOffsetSize = 8 - val LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize - val LastOffsetSize = 8 - val LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize - val LastStableOffsetSize = 8 - val TotalSize = LastStableOffsetOffset + LastStableOffsetSize - - val CurrentVersion: Short = 0 -} - -private[log] class AbortedTxn(val buffer: ByteBuffer) { - import AbortedTxn._ - - def this(producerId: Long, - firstOffset: Long, - lastOffset: Long, - lastStableOffset: Long) = { - this(ByteBuffer.allocate(AbortedTxn.TotalSize)) - buffer.putShort(CurrentVersion) - buffer.putLong(producerId) - buffer.putLong(firstOffset) - buffer.putLong(lastOffset) - buffer.putLong(lastStableOffset) - buffer.flip() - } - - def this(completedTxn: CompletedTxn, lastStableOffset: Long) = - this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset) - - def version: Short = buffer.get(VersionOffset) - - def producerId: Long = buffer.getLong(ProducerIdOffset) - - def firstOffset: Long = buffer.getLong(FirstOffsetOffset) - - def lastOffset: Long = buffer.getLong(LastOffsetOffset) - - def lastStableOffset: Long = buffer.getLong(LastStableOffsetOffset) - - def asAbortedTransaction: FetchResponseData.AbortedTransaction = new FetchResponseData.AbortedTransaction() - .setProducerId(producerId) - .setFirstOffset(firstOffset) - - override def toString: String = - s"AbortedTxn(version=$version, producerId=$producerId, firstOffset=$firstOffset, " + - s"lastOffset=$lastOffset, lastStableOffset=$lastStableOffset)" - - override def equals(any: Any): Boolean = { - any match { - case that: AbortedTxn => this.buffer.equals(that.buffer) - case _ => false - } - } - - override def hashCode(): Int = buffer.hashCode -} diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 8830258c8fe..e1f49cceb6e 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 +import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import scala.annotation.nowarn @@ -159,26 +160,6 @@ case class LogReadInfo(fetchedData: FetchDataInfo, logEndOffset: Long, lastStableOffset: Long) -/** - * A class used to hold useful metadata about a completed transaction. This is used to build - * the transaction index after appending to the log. - * - * @param producerId The ID of the producer - * @param firstOffset The first offset (inclusive) of the transaction - * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the - * COMMIT/ABORT control record which indicates the transaction's completion. - * @param isAborted Whether or not the transaction was aborted - */ -case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) { - override def toString: String = { - "CompletedTxn(" + - s"producerId=$producerId, " + - s"firstOffset=$firstOffset, " + - s"lastOffset=$lastOffset, " + - s"isAborted=$isAborted)" - } -} - /** * A class used to hold params required to decide to rotate a log segment or not. */ diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala index 9dfaaa5539c..594b74f19ff 100644 --- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala +++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala @@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread} import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.OffsetPosition +import org.apache.kafka.server.log.internals.{OffsetPosition, TransactionIndex} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager} diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index c82523eff4b..fe8bfd98d41 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.bootstrap.BootstrapDirectory +import org.apache.kafka.server.log.internals.TransactionIndex import org.apache.kafka.snapshot.Snapshots import scala.jdk.CollectionConverters._ @@ -94,7 +95,7 @@ object DumpLogSegments { private def dumpTxnIndex(file: File): Unit = { val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file) - for (abortedTxn <- index.allAbortedTxns) { + for (abortedTxn <- index.allAbortedTxns.asScala) { println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " + s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}") } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 819278bd280..071e8b8fd1c 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.internals.AbortedTxn import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} @@ -328,8 +329,8 @@ class LogCleanerTest { assertEquals(20L, log.logEndOffset) val expectedAbortedTxns = List( - new AbortedTxn(producerId=producerId1, firstOffset=8, lastOffset=10, lastStableOffset=11), - new AbortedTxn(producerId=producerId2, firstOffset=11, lastOffset=16, lastStableOffset=17) + new AbortedTxn(producerId1, 8, 10, 11), + new AbortedTxn(producerId2, 11, 16, 17) ) assertAllTransactionsComplete(log) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index ff7cc00ffe9..269dff643b5 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 +import org.apache.kafka.server.log.internals.AbortedTxn import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index c31099fa928..c1776a4344c 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -346,7 +346,7 @@ class LogSegmentTest { var abortedTxns = segment.txnIndex.allAbortedTxns assertEquals(1, abortedTxns.size) - var abortedTxn = abortedTxns.head + var abortedTxn = abortedTxns.get(0) assertEquals(pid2, abortedTxn.producerId) assertEquals(102L, abortedTxn.firstOffset) assertEquals(106L, abortedTxn.lastOffset) @@ -362,7 +362,7 @@ class LogSegmentTest { abortedTxns = segment.txnIndex.allAbortedTxns assertEquals(1, abortedTxns.size) - abortedTxn = abortedTxns.head + abortedTxn = abortedTxns.get(0) assertEquals(pid2, abortedTxn.producerId) assertEquals(75L, abortedTxn.firstOffset) assertEquals(106L, abortedTxn.lastOffset) diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 8a73c8bb943..a293ed9eac9 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -21,7 +21,6 @@ import kafka.log.remote.RemoteLogManager import java.io.File import java.util.Properties - import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel} import kafka.utils.{Scheduler, TestUtils} @@ -29,10 +28,11 @@ import org.apache.kafka.common.Uuid import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.utils.{Time, Utils} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} + import java.nio.file.Files import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} - import kafka.log +import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex} import scala.collection.Iterable import scala.jdk.CollectionConverters._ @@ -237,7 +237,7 @@ object LogTestUtils { log.read(startOffset, maxLength, isolation, minOneMessage) } - def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) + def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns.asScala) def deleteProducerSnapshotFiles(logDir: File): Unit = { val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix)) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index b631b642ef4..c808d03a72e 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Utils} +import org.apache.kafka.server.log.internals.CompletedTxn import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{mock, when} @@ -246,7 +247,7 @@ class ProducerStateManagerTest { // incomplete transaction val secondAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.Client) val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, secondAppend) - assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)), firstCompletedTxn) + assertEquals(Some(new CompletedTxn(producerId, 16L, 21, false)), firstCompletedTxn) assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, secondAppend)) assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend)) appendData(24L, 27L, secondAppend) @@ -392,21 +393,21 @@ class ProducerStateManagerTest { beginTxn(producerId3, startOffset3) val lastOffset1 = startOffset3 + 15 - val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false) + val completedTxn1 = new CompletedTxn(producerId1, startOffset1, lastOffset1, false) assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1)) stateManager.completeTxn(completedTxn1) stateManager.onHighWatermarkUpdated(lastOffset1 + 1) assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) val lastOffset3 = lastOffset1 + 20 - val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false) + val completedTxn3 = new CompletedTxn(producerId3, startOffset3, lastOffset3, false) assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3)) stateManager.completeTxn(completedTxn3) stateManager.onHighWatermarkUpdated(lastOffset3 + 1) assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) val lastOffset2 = lastOffset3 + 78 - val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false) + val completedTxn2 = new CompletedTxn(producerId2, startOffset2, lastOffset2, false) assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2)) stateManager.completeTxn(completedTxn2) stateManager.onHighWatermarkUpdated(lastOffset2 + 1) diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala index 9c1cbe3f307..6784e76a8e5 100644 --- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala @@ -18,11 +18,13 @@ package kafka.log import kafka.utils.TestUtils import org.apache.kafka.common.message.FetchResponseData -import org.apache.kafka.server.log.internals.CorruptIndexException +import org.apache.kafka.server.log.internals.{AbortedTxn, CorruptIndexException, TransactionIndex} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import scala.jdk.CollectionConverters._ import java.io.File +import java.util.Collections class TransactionIndexTest { var file: File = _ @@ -43,26 +45,26 @@ class TransactionIndexTest { @Test def testPositionSetCorrectlyWhenOpened(): Unit = { val abortedTxns = List( - new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), - new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), - new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), - new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + new AbortedTxn(0L, 0, 10, 11), + new AbortedTxn(1L, 5, 15, 13), + new AbortedTxn(2L, 18, 35, 25), + new AbortedTxn(3L, 32, 50, 40)) abortedTxns.foreach(index.append) index.close() val reopenedIndex = new TransactionIndex(0L, file) - val anotherAbortedTxn = new AbortedTxn(producerId = 3L, firstOffset = 50, lastOffset = 60, lastStableOffset = 55) + val anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55) reopenedIndex.append(anotherAbortedTxn) - assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns) + assertEquals((abortedTxns ++ List(anotherAbortedTxn)).asJava, reopenedIndex.allAbortedTxns) } @Test def testSanityCheck(): Unit = { val abortedTxns = List( - new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), - new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), - new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), - new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + new AbortedTxn(0L, 0, 10, 11), + new AbortedTxn(1L, 5, 15, 13), + new AbortedTxn(2L, 18, 35, 25), + new AbortedTxn(3L, 32, 50, 40)) abortedTxns.foreach(index.append) index.close() @@ -73,71 +75,71 @@ class TransactionIndexTest { @Test def testLastOffsetMustIncrease(): Unit = { - index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13)) - assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, - lastOffset = 15, lastStableOffset = 11))) + index.append(new AbortedTxn(1L, 5, 15, 13)) + assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0, + 15, 11))) } @Test def testLastOffsetCannotDecrease(): Unit = { - index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13)) - assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, - lastOffset = 10, lastStableOffset = 11))) + index.append(new AbortedTxn(1L, 5, 15, 13)) + assertThrows(classOf[IllegalArgumentException], () => index.append(new AbortedTxn(0L, 0, + 10, 11))) } @Test def testCollectAbortedTransactions(): Unit = { val abortedTransactions = List( - new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), - new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), - new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), - new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + new AbortedTxn(0L, 0, 10, 11), + new AbortedTxn(1L, 5, 15, 13), + new AbortedTxn(2L, 18, 35, 25), + new AbortedTxn(3L, 32, 50, 40)) abortedTransactions.foreach(index.append) var result = index.collectAbortedTxns(0L, 100L) - assertEquals(abortedTransactions, result.abortedTransactions) + assertEquals(abortedTransactions.asJava, result.abortedTransactions) assertFalse(result.isComplete) result = index.collectAbortedTxns(0L, 32) - assertEquals(abortedTransactions.take(3), result.abortedTransactions) + assertEquals(abortedTransactions.take(3).asJava, result.abortedTransactions) assertTrue(result.isComplete) result = index.collectAbortedTxns(0L, 35) - assertEquals(abortedTransactions, result.abortedTransactions) + assertEquals(abortedTransactions.asJava, result.abortedTransactions) assertTrue(result.isComplete) result = index.collectAbortedTxns(10, 35) - assertEquals(abortedTransactions, result.abortedTransactions) + assertEquals(abortedTransactions.asJava, result.abortedTransactions) assertTrue(result.isComplete) result = index.collectAbortedTxns(11, 35) - assertEquals(abortedTransactions.slice(1, 4), result.abortedTransactions) + assertEquals(abortedTransactions.slice(1, 4).asJava, result.abortedTransactions) assertTrue(result.isComplete) result = index.collectAbortedTxns(20, 41) - assertEquals(abortedTransactions.slice(2, 4), result.abortedTransactions) + assertEquals(abortedTransactions.slice(2, 4).asJava, result.abortedTransactions) assertFalse(result.isComplete) } @Test def testTruncate(): Unit = { val abortedTransactions = List( - new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2), - new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16), - new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), - new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + new AbortedTxn(0L, 0, 10, 2), + new AbortedTxn(1L, 5, 15, 16), + new AbortedTxn(2L, 18, 35, 25), + new AbortedTxn(3L, 32, 50, 40)) abortedTransactions.foreach(index.append) index.truncateTo(51) - assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions) + assertEquals(abortedTransactions.asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions) index.truncateTo(50) - assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions) + assertEquals(abortedTransactions.take(3).asJava, index.collectAbortedTxns(0L, 100L).abortedTransactions) index.reset() - assertEquals(List.empty[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions) + assertEquals(Collections.emptyList[FetchResponseData.AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions) } @Test @@ -148,7 +150,7 @@ class TransactionIndexTest { val lastStableOffset = 200L val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset) - assertEquals(AbortedTxn.CurrentVersion, abortedTxn.version) + assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version) assertEquals(pid, abortedTxn.producerId) assertEquals(firstOffset, abortedTxn.firstOffset) assertEquals(lastOffset, abortedTxn.lastOffset) @@ -158,15 +160,15 @@ class TransactionIndexTest { @Test def testRenameIndex(): Unit = { val renamed = TestUtils.tempFile() - index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2)) + index.append(new AbortedTxn(0L, 0, 10, 2)) index.renameTo(renamed) - index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16)) + index.append(new AbortedTxn(1L, 5, 15, 16)) val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions assertEquals(2, abortedTxns.size) - assertEquals(0, abortedTxns(0).firstOffset) - assertEquals(5, abortedTxns(1).firstOffset) + assertEquals(0, abortedTxns.get(0).firstOffset) + assertEquals(5, abortedTxns.get(1).firstOffset) } @Test diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 42fdafae206..45335eec8ec 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} +import org.apache.kafka.server.log.internals.AbortedTxn import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.Assertions._ diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java b/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java new file mode 100644 index 00000000000..2ad08ce77ba --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/AbortedTxn.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.nio.ByteBuffer; +import java.util.Objects; + +public class AbortedTxn { + static final int VERSION_OFFSET = 0; + static final int VERSION_SIZE = 2; + static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE; + static final int PRODUCER_ID_SIZE = 8; + static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_SIZE; + static final int FIRST_OFFSET_SIZE = 8; + static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET + FIRST_OFFSET_SIZE; + static final int LAST_OFFSET_SIZE = 8; + static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET + LAST_OFFSET_SIZE; + static final int LAST_STABLE_OFFSET_SIZE = 8; + static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET + LAST_STABLE_OFFSET_SIZE; + + public static final short CURRENT_VERSION = 0; + + final ByteBuffer buffer; + + AbortedTxn(ByteBuffer buffer) { + Objects.requireNonNull(buffer); + this.buffer = buffer; + } + + public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) { + this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset); + } + + public AbortedTxn(long producerId, long firstOffset, long lastOffset, long lastStableOffset) { + this(toByteBuffer(producerId, firstOffset, lastOffset, lastStableOffset)); + } + + private static ByteBuffer toByteBuffer(long producerId, long firstOffset, long lastOffset, long lastStableOffset) { + ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE); + buffer.putShort(CURRENT_VERSION); + buffer.putLong(producerId); + buffer.putLong(firstOffset); + buffer.putLong(lastOffset); + buffer.putLong(lastStableOffset); + buffer.flip(); + return buffer; + } + + public short version() { + return buffer.get(VERSION_OFFSET); + } + + public long producerId() { + return buffer.getLong(PRODUCER_ID_OFFSET); + } + + public long firstOffset() { + return buffer.getLong(FIRST_OFFSET_OFFSET); + } + + public long lastOffset() { + return buffer.getLong(LAST_OFFSET_OFFSET); + } + + public long lastStableOffset() { + return buffer.getLong(LAST_STABLE_OFFSET_OFFSET); + } + + public FetchResponseData.AbortedTransaction asAbortedTransaction() { + return new FetchResponseData.AbortedTransaction() + .setProducerId(producerId()) + .setFirstOffset(firstOffset()); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + AbortedTxn that = (AbortedTxn) o; + return buffer.equals(that.buffer); + } + + @Override + public int hashCode() { + return buffer.hashCode(); + } + + @Override + public String toString() { + return "AbortedTxn(version=" + version() + + ", producerId=" + producerId() + + ", firstOffset=" + firstOffset() + + ", lastOffset=" + lastOffset() + + ", lastStableOffset=" + lastStableOffset() + + ")"; + } + +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java b/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java new file mode 100644 index 00000000000..1ad6f8854b4 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/CompletedTxn.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +/** + * A class used to hold useful metadata about a completed transaction. This is used to build + * the transaction index after appending to the log. + */ +public class CompletedTxn { + public final long producerId; + public final long firstOffset; + public final long lastOffset; + public final boolean isAborted; + + /** + * Create an instance of this class. + * + * @param producerId The ID of the producer + * @param firstOffset The first offset (inclusive) of the transaction + * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the + * COMMIT/ABORT control record which indicates the transaction's completion. + * @param isAborted Whether the transaction was aborted + */ + public CompletedTxn(long producerId, long firstOffset, long lastOffset, boolean isAborted) { + this.producerId = producerId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + this.isAborted = isAborted; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CompletedTxn that = (CompletedTxn) o; + + return producerId == that.producerId + && firstOffset == that.firstOffset + && lastOffset == that.lastOffset + && isAborted == that.isAborted; + } + + @Override + public int hashCode() { + int result = Long.hashCode(producerId); + result = 31 * result + Long.hashCode(firstOffset); + result = 31 * result + Long.hashCode(lastOffset); + result = 31 * result + Boolean.hashCode(isAborted); + return result; + } + + @Override + public String toString() { + return "CompletedTxn(producerId=" + producerId + + ", firstOffset=" + firstOffset + + ", lastOffset=" + lastOffset + + ", isAborted=" + isAborted + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java new file mode 100644 index 00000000000..646541ae9c1 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TransactionIndex.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Utils; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.function.Supplier; + +/** + * The transaction index maintains metadata about the aborted transactions for each segment. This includes + * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of + * the abort. This index is used to find the aborted transactions in the range of a given fetch request at + * the READ_COMMITTED isolation level. + * + * There is at most one transaction index for each log segment. The entries correspond to the transactions + * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions + * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in + * order to find the start of the transactions. + */ +public class TransactionIndex implements Closeable { + + private static class AbortedTxnWithPosition { + final AbortedTxn txn; + final int position; + AbortedTxnWithPosition(AbortedTxn txn, int position) { + this.txn = txn; + this.position = position; + } + } + + private final long startOffset; + + private volatile File file; + + // note that the file is not created until we need it + private Optional<FileChannel> maybeChannel = Optional.empty(); + private OptionalLong lastOffset = OptionalLong.empty(); + + public TransactionIndex(long startOffset, File file) throws IOException { + this.startOffset = startOffset; + this.file = file; + + if (file.exists()) + openChannel(); + } + + public File file() { + return file; + } + + public void updateParentDir(File parentDir) { + this.file = new File(parentDir, file.getName()); + } + + public void append(AbortedTxn abortedTxn) throws IOException { + lastOffset.ifPresent(offset -> { + if (offset >= abortedTxn.lastOffset()) + throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but " + + abortedTxn.lastOffset() + " is not greater than current last offset " + offset + " of index " + + file.getAbsolutePath()); + }); + lastOffset = OptionalLong.of(abortedTxn.lastOffset()); + Utils.writeFully(channel(), abortedTxn.buffer.duplicate()); + } + + public void flush() throws IOException { + FileChannel channel = channelOrNull(); + if (channel != null) + channel.force(true); + } + + /** + * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time. + */ + public void reset() throws IOException { + FileChannel channel = channelOrNull(); + if (channel != null) + channel.truncate(0); + lastOffset = OptionalLong.empty(); + } + + public void close() throws IOException { + FileChannel channel = channelOrNull(); + if (channel != null) + channel.close(); + maybeChannel = Optional.empty(); + } + + /** + * Delete this index. + * + * @throws IOException if deletion fails due to an I/O error + * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did + * not exist + */ + public boolean deleteIfExists() throws IOException { + close(); + return Files.deleteIfExists(file.toPath()); + } + + public void renameTo(File f) throws IOException { + try { + if (file.exists()) + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false); + } finally { + this.file = f; + } + } + + public void truncateTo(long offset) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE); + OptionalLong newLastOffset = OptionalLong.empty(); + for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) { + AbortedTxn abortedTxn = txnWithPosition.txn; + long position = txnWithPosition.position; + if (abortedTxn.lastOffset() >= offset) { + channel().truncate(position); + lastOffset = newLastOffset; + return; + } + newLastOffset = OptionalLong.of(abortedTxn.lastOffset()); + } + } + + public List<AbortedTxn> allAbortedTxns() { + List<AbortedTxn> result = new ArrayList<>(); + for (AbortedTxnWithPosition txnWithPosition : iterable()) + result.add(txnWithPosition.txn); + return result; + } + + /** + * Collect all aborted transactions which overlap with a given fetch range. + * + * @param fetchOffset Inclusive first offset of the fetch range + * @param upperBoundOffset Exclusive last offset in the fetch range + * @return An object containing the aborted transactions and whether the search needs to continue + * into the next log segment. + */ + public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBoundOffset) { + List<AbortedTxn> abortedTransactions = new ArrayList<>(); + for (AbortedTxnWithPosition txnWithPosition : iterable()) { + AbortedTxn abortedTxn = txnWithPosition.txn; + if (abortedTxn.lastOffset() >= fetchOffset && abortedTxn.firstOffset() < upperBoundOffset) + abortedTransactions.add(abortedTxn); + + if (abortedTxn.lastStableOffset() >= upperBoundOffset) + return new TxnIndexSearchResult(abortedTransactions, true); + } + return new TxnIndexSearchResult(abortedTransactions, false); + } + + /** + * Do a basic sanity check on this index to detect obvious problems. + * + * @throws CorruptIndexException if any problems are found. + */ + public void sanityCheck() { + ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE); + for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) { + AbortedTxn abortedTxn = txnWithPosition.txn; + if (abortedTxn.lastOffset() < startOffset) + throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index " + + file.getAbsolutePath() + " is less than start offset " + startOffset); + } + } + + private FileChannel openChannel() throws IOException { + FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, + StandardOpenOption.READ, StandardOpenOption.WRITE); + maybeChannel = Optional.of(channel); + channel.position(channel.size()); + return channel; + } + + private FileChannel channel() throws IOException { + FileChannel channel = channelOrNull(); + if (channel == null) + return openChannel(); + else + return channel; + } + + private FileChannel channelOrNull() { + return maybeChannel.orElse(null); + } + + private Iterable<AbortedTxnWithPosition> iterable() { + return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE)); + } + + private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) { + FileChannel channel = channelOrNull(); + if (channel == null) + return Collections.emptyList(); + + PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); + + return () -> new Iterator<AbortedTxnWithPosition>() { + + @Override + public boolean hasNext() { + try { + return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; + } catch (IOException e) { + throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); + } + } + + @Override + public AbortedTxnWithPosition next() { + try { + ByteBuffer buffer = allocate.get(); + Utils.readFully(channel, buffer, position.value); + buffer.flip(); + + AbortedTxn abortedTxn = new AbortedTxn(buffer); + if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION) + throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version() + + " in transaction index " + file.getAbsolutePath() + ", current version is " + + AbortedTxn.CURRENT_VERSION); + AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); + position.value += AbortedTxn.TOTAL_SIZE; + return nextEntry; + } catch (IOException e) { + // We received an unexpected error reading from the index file. We propagate this as an + // UNKNOWN error to the consumer, which will cause it to retry the fetch. + throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); + } + } + + }; + } + +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java new file mode 100644 index 00000000000..c1d40501af7 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnIndexSearchResult.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Collections; +import java.util.List; + +public class TxnIndexSearchResult { + public final List<AbortedTxn> abortedTransactions; + public final boolean isComplete; + + public TxnIndexSearchResult(List<AbortedTxn> abortedTransactions, boolean isComplete) { + this.abortedTransactions = Collections.unmodifiableList(abortedTransactions); + this.isComplete = isComplete; + } +}