This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2dec39d6e49 KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module (#13043) 2dec39d6e49 is described below commit 2dec39d6e49da4cfb502da3e84d4f9c50508e809 Author: Satish Duggana <sati...@apache.org> AuthorDate: Sun Jan 8 09:43:38 2023 +0530 KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module (#13043) For broader context on this change, see: * KAFKA-14470: Move log layer to storage module. Reviewers: Ismael Juma <ism...@juma.me.uk> --- core/src/main/scala/kafka/log/LogCleaner.scala | 9 +- core/src/main/scala/kafka/log/LogSegment.scala | 3 +- .../scala/kafka/log/ProducerStateManager.scala | 346 +-------------------- core/src/main/scala/kafka/log/UnifiedLog.scala | 19 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 2 +- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 8 +- .../unit/kafka/log/ProducerStateManagerTest.scala | 77 +++-- .../kafka/server/log/internals/BatchMetadata.java | 79 +++++ .../kafka/server/log/internals/LastRecord.java | 59 ++++ .../server/log/internals/ProducerAppendInfo.java | 239 ++++++++++++++ .../server/log/internals/ProducerStateEntry.java | 150 +++++++++ .../kafka/server/log/internals/TxnMetadata.java | 51 +++ 12 files changed, 665 insertions(+), 377 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 83b1b0e81b6..5a098790a31 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex} +import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer @@ -680,9 +680,10 @@ private[log] class Cleaner(val id: Int, // 3) The last entry in the log is a transaction marker. We retain this marker since it has the // last producer epoch, which is needed to ensure fencing. lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord => - lastRecord.lastDataOffset match { - case Some(offset) => batch.lastOffset == offset - case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch + if (lastRecord.lastDataOffset.isPresent) { + batch.lastOffset == lastRecord.lastDataOffset.getAsLong + } else { + batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch } } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index d289df2ec47..53b51cb16ac 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} import java.util.Optional +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ import scala.math._ @@ -249,7 +250,7 @@ class LogSegment private[log] (val log: FileRecords, if (batch.hasProducerId) { val producerId = batch.producerId val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION) - val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None) + val maybeCompletedTxn = appendInfo.append(batch, Optional.empty()).asScala producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index da9f17c2c22..2dc7748152d 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -20,9 +20,8 @@ import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils.{Logging, nonthreadsafe, threadsafe} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.types._ -import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time} import org.apache.kafka.server.log.internals._ @@ -30,320 +29,11 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, NoSuchFileException, StandardOpenOption} +import java.util.{Optional, OptionalLong} import java.util.concurrent.ConcurrentSkipListMap -import scala.collection.mutable.ListBuffer import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ -/** - * The last written record for a given producer. The last data offset may be undefined - * if the only log entry for a producer is a transaction marker. - */ -case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short) - - -private[log] case class TxnMetadata( - producerId: Long, - firstOffset: LogOffsetMetadata, - var lastOffset: Option[Long] = None -) { - def this(producerId: Long, firstOffset: Long) = this(producerId, new LogOffsetMetadata(firstOffset)) - - override def toString: String = { - "TxnMetadata(" + - s"producerId=$producerId, " + - s"firstOffset=$firstOffset, " + - s"lastOffset=$lastOffset)" - } -} - -private[log] object ProducerStateEntry { - private[log] val NumBatchesToRetain = 5 - - def empty(producerId: Long) = new ProducerStateEntry(producerId, - batchMetadata = mutable.Queue[BatchMetadata](), - producerEpoch = RecordBatch.NO_PRODUCER_EPOCH, - coordinatorEpoch = -1, - lastTimestamp = RecordBatch.NO_TIMESTAMP, - currentTxnFirstOffset = None) -} - -private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) { - def firstSeq: Int = DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta) - def firstOffset: Long = lastOffset - offsetDelta - - override def toString: String = { - "BatchMetadata(" + - s"firstSeq=$firstSeq, " + - s"lastSeq=$lastSeq, " + - s"firstOffset=$firstOffset, " + - s"lastOffset=$lastOffset, " + - s"timestamp=$timestamp)" - } -} - -// the batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the -// batch with the highest sequence is at the tail of the queue. We will retain at most ProducerStateEntry.NumBatchesToRetain -// elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. -private[log] class ProducerStateEntry(val producerId: Long, - val batchMetadata: mutable.Queue[BatchMetadata], - var producerEpoch: Short, - var coordinatorEpoch: Int, - var lastTimestamp: Long, - var currentTxnFirstOffset: Option[Long]) { - - def firstSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.front.firstSeq - - def firstDataOffset: Long = if (isEmpty) -1L else batchMetadata.front.firstOffset - - def lastSeq: Int = if (isEmpty) RecordBatch.NO_SEQUENCE else batchMetadata.last.lastSeq - - def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset - - def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta - - def isEmpty: Boolean = batchMetadata.isEmpty - - def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long): Unit = { - maybeUpdateProducerEpoch(producerEpoch) - addBatchMetadata(BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp)) - this.lastTimestamp = timestamp - } - - def maybeUpdateProducerEpoch(producerEpoch: Short): Boolean = { - if (this.producerEpoch != producerEpoch) { - batchMetadata.clear() - this.producerEpoch = producerEpoch - true - } else { - false - } - } - - private def addBatchMetadata(batch: BatchMetadata): Unit = { - if (batchMetadata.size == ProducerStateEntry.NumBatchesToRetain) - batchMetadata.dequeue() - batchMetadata.enqueue(batch) - } - - def update(nextEntry: ProducerStateEntry): Unit = { - maybeUpdateProducerEpoch(nextEntry.producerEpoch) - while (nextEntry.batchMetadata.nonEmpty) - addBatchMetadata(nextEntry.batchMetadata.dequeue()) - this.coordinatorEpoch = nextEntry.coordinatorEpoch - this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset - this.lastTimestamp = nextEntry.lastTimestamp - } - - def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = { - if (batch.producerEpoch != producerEpoch) - None - else - batchWithSequenceRange(batch.baseSequence, batch.lastSequence) - } - - // Return the batch metadata of the cached batch having the exact sequence range, if any. - def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = { - val duplicate = batchMetadata.filter { metadata => - firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq - } - duplicate.headOption - } - - override def toString: String = { - "ProducerStateEntry(" + - s"producerId=$producerId, " + - s"producerEpoch=$producerEpoch, " + - s"currentTxnFirstOffset=$currentTxnFirstOffset, " + - s"coordinatorEpoch=$coordinatorEpoch, " + - s"lastTimestamp=$lastTimestamp, " + - s"batchMetadata=$batchMetadata" - } -} - -/** - * This class is used to validate the records appended by a given producer before they are written to the log. - * It is initialized with the producer's state after the last successful append, and transitively validates the - * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata - * as the incoming records are validated. - * - * @param producerId The id of the producer appending to the log - * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of - * the most recent appends made by the producer. Validation of the first incoming append will - * be made against the latest append in the current entry. New appends will replace older appends - * in the current entry so that the space overhead is constant. - * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset - * commits, which originate from the group coordinator, do not have sequence numbers and therefore - * only producer epoch validation is done. Appends which come through replication are not validated - * (we assume the validation has already been done) and appends from clients require full validation. - */ -private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, - val producerId: Long, - val currentEntry: ProducerStateEntry, - val origin: AppendOrigin) extends Logging { - - private val transactions = ListBuffer.empty[TxnMetadata] - private val updatedEntry = ProducerStateEntry.empty(producerId) - - updatedEntry.producerEpoch = currentEntry.producerEpoch - updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch - updatedEntry.lastTimestamp = currentEntry.lastTimestamp - updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset - - private def maybeValidateDataBatch(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = { - checkProducerEpoch(producerEpoch, offset) - if (origin == AppendOrigin.CLIENT) { - checkSequence(producerEpoch, firstSeq, offset) - } - } - - private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { - if (producerEpoch < updatedEntry.producerEpoch) { - val message = s"Epoch of producer $producerId at offset $offset in $topicPartition is $producerEpoch, " + - s"which is smaller than the last seen epoch ${updatedEntry.producerEpoch}" - - if (origin == AppendOrigin.REPLICATION) { - warn(message) - } else { - // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the - // producer send response callback to differentiate from the former fatal exception, - // letting client abort the ongoing transaction and retry. - throw new InvalidProducerEpochException(message) - } - } - } - - private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = { - if (producerEpoch != updatedEntry.producerEpoch) { - if (appendFirstSeq != 0) { - if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) { - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch of producer $producerId " + - s"at offset $offset in partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number), " + - s"${updatedEntry.producerEpoch} (current producer epoch)") - } - } - } else { - val currentLastSeq = if (!updatedEntry.isEmpty) - updatedEntry.lastSeq - else if (producerEpoch == currentEntry.producerEpoch) - currentEntry.lastSeq - else - RecordBatch.NO_SEQUENCE - - // If there is no current producer epoch (possibly because all producer records have been deleted due to - // retention or the DeleteRecords API) accept writes with any sequence number - if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { - throw new OutOfOrderSequenceException(s"Out of order sequence number for producer $producerId at " + - s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " + - s"$currentLastSeq (current end sequence number)") - } - } - } - - private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = { - nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue) - } - - def append(batch: RecordBatch, firstOffsetMetadataOpt: Option[LogOffsetMetadata]): Option[CompletedTxn] = { - if (batch.isControlBatch) { - val recordIterator = batch.iterator - if (recordIterator.hasNext) { - val record = recordIterator.next() - val endTxnMarker = EndTransactionMarker.deserialize(record) - appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp) - } else { - // An empty control batch means the entire transaction has been cleaned from the log, so no need to append - None - } - } else { - val firstOffsetMetadata = firstOffsetMetadataOpt.getOrElse(new LogOffsetMetadata(batch.baseOffset)) - appendDataBatch(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, - firstOffsetMetadata, batch.lastOffset, batch.isTransactional) - None - } - } - - def appendDataBatch(epoch: Short, - firstSeq: Int, - lastSeq: Int, - lastTimestamp: Long, - firstOffsetMetadata: LogOffsetMetadata, - lastOffset: Long, - isTransactional: Boolean): Unit = { - val firstOffset = firstOffsetMetadata.messageOffset - maybeValidateDataBatch(epoch, firstSeq, firstOffset) - updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp) - - updatedEntry.currentTxnFirstOffset match { - case Some(_) if !isTransactional => - // Received a non-transactional message while a transaction is active - throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " + - s"offset $firstOffsetMetadata in partition $topicPartition") - - case None if isTransactional => - // Began a new transaction - updatedEntry.currentTxnFirstOffset = Some(firstOffset) - transactions += TxnMetadata(producerId, firstOffsetMetadata) - - case _ => // nothing to do - } - } - - private def checkCoordinatorEpoch(endTxnMarker: EndTransactionMarker, offset: Long): Unit = { - if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch) { - if (origin == AppendOrigin.REPLICATION) { - info(s"Detected invalid coordinator epoch for producerId $producerId at " + - s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " + - s"is older than previously known coordinator epoch ${updatedEntry.coordinatorEpoch}") - } else { - throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " + - s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " + - s"(zombie), ${updatedEntry.coordinatorEpoch} (current)") - } - } - } - - def appendEndTxnMarker( - endTxnMarker: EndTransactionMarker, - producerEpoch: Short, - offset: Long, - timestamp: Long - ): Option[CompletedTxn] = { - checkProducerEpoch(producerEpoch, offset) - checkCoordinatorEpoch(endTxnMarker, offset) - - // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker - // without any associated data will not have any impact on the last stable offset - // and would not need to be reflected in the transaction index. - val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset => - new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) - } - - updatedEntry.maybeUpdateProducerEpoch(producerEpoch) - updatedEntry.currentTxnFirstOffset = None - updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch - updatedEntry.lastTimestamp = timestamp - - completedTxn - } - - def toEntry: ProducerStateEntry = updatedEntry - - def startedTransactions: List[TxnMetadata] = transactions.toList - - override def toString: String = { - "ProducerAppendInfo(" + - s"producerId=$producerId, " + - s"producerEpoch=${updatedEntry.producerEpoch}, " + - s"firstSequence=${updatedEntry.firstSeq}, " + - s"lastSequence=${updatedEntry.lastSeq}, " + - s"currentTxnFirstOffset=${updatedEntry.currentTxnFirstOffset}, " + - s"coordinatorEpoch=${updatedEntry.coordinatorEpoch}, " + - s"lastTimestamp=${updatedEntry.lastTimestamp}, " + - s"startedTransactions=$transactions)" - } -} - object ProducerStateManager { val LateTransactionBufferMs = 5 * 60 * 1000 @@ -403,13 +93,11 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata] - if (offset >= 0) - lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp) - - val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch, - coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) - newEntry + val batchMetadata = + if (offset >= 0) Optional.of(new BatchMetadata(seq, offset, offsetDelta, timestamp)) + else Optional.empty[BatchMetadata]() + val currentTxnFirstOffsetValue = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() + new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetValue, batchMetadata) } } catch { case e: SchemaException => @@ -431,7 +119,7 @@ object ProducerStateManager { .set(OffsetDeltaField, entry.lastOffsetDelta) .set(TimestampField, entry.lastTimestamp) .set(CoordinatorEpochField, entry.coordinatorEpoch) - .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L)) + .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.orElse(-1L)) producerEntryStruct }.toArray struct.set(ProducerEntriesField, entriesArray) @@ -518,7 +206,7 @@ class ProducerStateManager( val lastTimestamp = oldestTxnLastTimestamp lastTimestamp > 0 && (currentTimeMs - lastTimestamp) > maxTransactionTimeoutMs + ProducerStateManager.LateTransactionBufferMs } - + def truncateFullyAndReloadSnapshots(): Unit = { info("Reloading the producer state snapshots") truncateFullyAndStartAt(0L) @@ -652,13 +340,11 @@ class ProducerStateManager( private[log] def loadProducerEntry(entry: ProducerStateEntry): Unit = { val producerId = entry.producerId producers.put(producerId, entry) - entry.currentTxnFirstOffset.foreach { offset => - ongoingTxns.put(offset, new TxnMetadata(producerId, offset)) - } + entry.currentTxnFirstOffset.ifPresent((offset: Long) => ongoingTxns.put(offset, new TxnMetadata(producerId, offset))) } private def isProducerExpired(currentTimeMs: Long, producerState: ProducerStateEntry): Boolean = - producerState.currentTxnFirstOffset.isEmpty && currentTimeMs - producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs + !producerState.currentTxnFirstOffset.isPresent && currentTimeMs - producerState.lastTimestamp >= producerStateManagerConfig.producerIdExpirationMs /** * Expire any producer ids which have been idle longer than the configured maximum expiration timeout. @@ -706,8 +392,8 @@ class ProducerStateManager( * Update the mapping with the given append information */ def update(appendInfo: ProducerAppendInfo): Unit = { - if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID) - throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId} passed to update " + + if (appendInfo.producerId() == RecordBatch.NO_PRODUCER_ID) + throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId()} passed to update " + s"for partition $topicPartition") trace(s"Updated producer ${appendInfo.producerId} state to $appendInfo") @@ -720,7 +406,7 @@ class ProducerStateManager( producers.put(appendInfo.producerId, updatedEntry) } - appendInfo.startedTransactions.foreach { txn => + appendInfo.startedTransactions.asScala.foreach { txn => ongoingTxns.put(txn.firstOffset.messageOffset, txn) } @@ -809,7 +495,7 @@ class ProducerStateManager( while (iterator.hasNext) { val txnEntry = iterator.next() val lastOffset = txnEntry.getValue.lastOffset - if (lastOffset.exists(_ < offset)) + if (lastOffset.isPresent && lastOffset.getAsLong < offset) iterator.remove() } } @@ -849,7 +535,7 @@ class ProducerStateManager( throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " + s"which was not started") - txnMetadata.lastOffset = Some(completedTxn.lastOffset) + txnMetadata.lastOffset = OptionalLong.of(completedTxn.lastOffset) unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata) updateOldestTxnTimestamp() } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index c982e10a3ef..3afad862289 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName import java.io.{File, IOException} import java.nio.file.Files -import java.util.Optional +import java.util.{Optional, OptionalLong} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit} import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.remote.RemoteLogManager @@ -42,13 +42,14 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 -import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator} +import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.record.BrokerCompressionType import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.collection.{Seq, immutable, mutable} +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ object LogAppendInfo { @@ -237,7 +238,7 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { */ @threadsafe class UnifiedLog(@volatile var logStartOffset: Long, - private[log] val localLog: LocalLog, + private val localLog: LocalLog, brokerTopicStats: BrokerTopicStats, val producerIdExpirationCheckIntervalMs: Int, @volatile var leaderEpochCache: Option[LeaderEpochFileCache], @@ -672,7 +673,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, .setLastSequence(state.lastSeq) .setLastTimestamp(state.lastTimestamp) .setCoordinatorEpoch(state.coordinatorEpoch) - .setCurrentTxnStartOffset(state.currentTxnFirstOffset.getOrElse(-1L)) + .setCurrentTxnStartOffset(state.currentTxnFirstOffset.orElse(-1L)) } }.toSeq } @@ -685,8 +686,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized { producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => - val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None - val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch) + val lastDataOffset = + if (producerIdEntry.lastDataOffset >= 0) OptionalLong.of(producerIdEntry.lastDataOffset) + else OptionalLong.empty() + val lastRecord = new LastRecord(lastDataOffset, producerIdEntry.producerEpoch) producerId -> lastRecord } } @@ -1083,7 +1086,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (origin == AppendOrigin.CLIENT) { val maybeLastEntry = producerStateManager.lastEntry(batch.producerId) - maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate => + maybeLastEntry.flatMap(_.findDuplicateBatch(batch).asScala).foreach { duplicate => return (updatedProducers, completedTxns.toList, Some(duplicate)) } } @@ -1978,7 +1981,7 @@ object UnifiedLog extends Logging { origin: AppendOrigin): Option[CompletedTxn] = { val producerId = batch.producerId val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin)) - appendInfo.append(batch, firstOffsetMetadata) + appendInfo.append(batch, firstOffsetMetadata.asJava).asScala } /** diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 5111320cc4b..ec0fb7d2e73 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -107,7 +107,7 @@ object DumpLogSegments { print(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} " + s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset} " + s"lastTimestamp: ${entry.lastTimestamp} ") - entry.batchMetadata.headOption.foreach { metadata => + entry.batchMetadata.asScala.headOption.foreach { metadata => print(s"firstSequence: ${metadata.firstSeq} lastSequence: ${metadata.lastSeq} " + s"lastOffset: ${metadata.lastOffset} offsetDelta: ${metadata.offsetDelta} timestamp: ${metadata.timestamp}") } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 85b4801a0ec..9bd5c56e7ac 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -17,6 +17,8 @@ package kafka.log import java.io.File +import java.util.OptionalLong + import kafka.server.checkpoints.LeaderEpochCheckpoint import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.utils.TestUtils @@ -25,7 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Time, Utils} -import org.apache.kafka.server.log.internals.LogConfig +import org.apache.kafka.server.log.internals.{BatchMetadata, LogConfig, ProducerStateEntry} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -355,9 +357,7 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() - stateManager.loadProducerEntry(new ProducerStateEntry(pid2, - mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, - 0, RecordBatch.NO_TIMESTAMP, Some(75L))) + stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 0, RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), java.util.Optional.of(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)))) segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 3e5ae15d211..903d51f94f2 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -21,7 +21,7 @@ import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.{Files, StandardOpenOption} -import java.util.Collections +import java.util.{Collections, Optional, OptionalLong} import java.util.concurrent.atomic.AtomicInteger import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition @@ -29,11 +29,15 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Utils} -import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata} +import org.apache.kafka.server.log.internals.{AppendOrigin, CompletedTxn, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry, TxnMetadata} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.{mock, when} +import java.util +import scala.compat.java8.OptionConverters.RichOptionalGeneric +import scala.jdk.CollectionConverters._ + class ProducerStateManagerTest { private var logDir: File = _ private var stateManager: ProducerStateManager = _ @@ -130,7 +134,7 @@ class ProducerStateManagerTest { val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION) // Sequence number wrap around appendInfo.appendDataBatch(epoch, Int.MaxValue - 10, 9, time.milliseconds(), - new LogOffsetMetadata(2000L), 2020L, isTransactional = false) + new LogOffsetMetadata(2000L), 2020L, false) assertEquals(None, stateManager.lastEntry(producerId)) stateManager.update(appendInfo) assertTrue(stateManager.lastEntry(producerId).isDefined) @@ -182,7 +186,7 @@ class ProducerStateManagerTest { val lastEntry = maybeLastEntry.get assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch) - assertEquals(None, lastEntry.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset) assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq) assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq) @@ -200,7 +204,7 @@ class ProducerStateManagerTest { val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224) producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(), - firstOffsetMetadata, offset, isTransactional = true) + firstOffsetMetadata, offset, true) stateManager.update(producerAppendInfo) assertEquals(Some(firstOffsetMetadata), stateManager.firstUnstableOffset) @@ -218,7 +222,7 @@ class ProducerStateManagerTest { appendInfo: ProducerAppendInfo ): Option[CompletedTxn] = { appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType, coordinatorEpoch), - producerEpoch, offset, time.milliseconds()) + producerEpoch, offset, time.milliseconds()).asScala } def appendData( @@ -228,14 +232,14 @@ class ProducerStateManagerTest { ): Unit = { val count = (endOffset - startOffset).toInt appendInfo.appendDataBatch(producerEpoch, seq.get(), seq.addAndGet(count), time.milliseconds(), - new LogOffsetMetadata(startOffset), endOffset, isTransactional = true) + new LogOffsetMetadata(startOffset), endOffset, true) seq.incrementAndGet() } // Start one transaction in a separate append val firstAppend = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) appendData(16L, 20L, firstAppend) - assertEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.head) + assertTxnMetadataEquals(new TxnMetadata(producerId, 16L), firstAppend.startedTransactions.asScala.head) stateManager.update(firstAppend) stateManager.onHighWatermarkUpdated(21L) assertEquals(Some(new LogOffsetMetadata(16L)), stateManager.firstUnstableOffset) @@ -255,8 +259,8 @@ class ProducerStateManagerTest { appendData(30L, 31L, secondAppend) assertEquals(2, secondAppend.startedTransactions.size) - assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.head) - assertEquals(TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.last) + assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(24L)), secondAppend.startedTransactions.asScala.head) + assertTxnMetadataEquals(new TxnMetadata(producerId, new LogOffsetMetadata(30L)), secondAppend.startedTransactions.asScala.last) stateManager.update(secondAppend) stateManager.completeTxn(firstCompletedTxn.get) stateManager.completeTxn(secondCompletedTxn.get) @@ -264,6 +268,21 @@ class ProducerStateManagerTest { assertEquals(Some(new LogOffsetMetadata(30L)), stateManager.firstUnstableOffset) } + def assertTxnMetadataEquals(expected: java.util.List[TxnMetadata], actual: java.util.List[TxnMetadata]): Unit = { + val expectedIter = expected.iterator() + val actualIter = actual.iterator() + assertEquals(expected.size(), actual.size()) + while (expectedIter.hasNext && actualIter.hasNext) { + assertTxnMetadataEquals(expectedIter.next(), actualIter.next()) + } + } + + def assertTxnMetadataEquals(expected: TxnMetadata, actual: TxnMetadata): Unit = { + assertEquals(expected.producerId, actual.producerId) + assertEquals(expected.firstOffset, actual.firstOffset) + assertEquals(expected.lastOffset, actual.lastOffset) + } + @Test def testHasLateTransaction(): Unit = { val producerId1 = 39L @@ -373,7 +392,7 @@ class ProducerStateManagerTest { ) val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset) producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(), - firstOffsetMetadata, startOffset, isTransactional = true) + firstOffsetMetadata, startOffset, true) stateManager.update(producerAppendInfo) } @@ -417,14 +436,14 @@ class ProducerStateManagerTest { val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) appendInfo.appendDataBatch(producerEpoch, 0, 5, time.milliseconds(), - new LogOffsetMetadata(15L), 20L, isTransactional = false) + new LogOffsetMetadata(15L), 20L, false) assertEquals(None, stateManager.lastEntry(producerId)) stateManager.update(appendInfo) assertTrue(stateManager.lastEntry(producerId).isDefined) val nextAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) nextAppendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(), - new LogOffsetMetadata(26L), 30L, isTransactional = false) + new LogOffsetMetadata(26L), 30L, false) assertTrue(stateManager.lastEntry(producerId).isDefined) var lastEntry = stateManager.lastEntry(producerId).get @@ -448,30 +467,30 @@ class ProducerStateManagerTest { val appendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.CLIENT) appendInfo.appendDataBatch(producerEpoch, 1, 5, time.milliseconds(), - new LogOffsetMetadata(16L), 20L, isTransactional = true) + new LogOffsetMetadata(16L), 20L, true) var lastEntry = appendInfo.toEntry assertEquals(producerEpoch, lastEntry.producerEpoch) assertEquals(1, lastEntry.firstSeq) assertEquals(5, lastEntry.lastSeq) assertEquals(16L, lastEntry.firstDataOffset) assertEquals(20L, lastEntry.lastDataOffset) - assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset) + assertTxnMetadataEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) appendInfo.appendDataBatch(producerEpoch, 6, 10, time.milliseconds(), - new LogOffsetMetadata(26L), 30L, isTransactional = true) + new LogOffsetMetadata(26L), 30L, true) lastEntry = appendInfo.toEntry assertEquals(producerEpoch, lastEntry.producerEpoch) assertEquals(1, lastEntry.firstSeq) assertEquals(10, lastEntry.lastSeq) assertEquals(16L, lastEntry.firstDataOffset) assertEquals(30L, lastEntry.lastDataOffset) - assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertEquals(OptionalLong.of(16L), lastEntry.currentTxnFirstOffset) + assertTxnMetadataEquals(util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds()) - assertTrue(completedTxnOpt.isDefined) + assertTrue(completedTxnOpt.isPresent) val completedTxn = completedTxnOpt.get assertEquals(producerId, completedTxn.producerId) @@ -487,8 +506,8 @@ class ProducerStateManagerTest { assertEquals(16L, lastEntry.firstDataOffset) assertEquals(30L, lastEntry.lastDataOffset) assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch) - assertEquals(None, lastEntry.currentTxnFirstOffset) - assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) + assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset) + assertTxnMetadataEquals(java.util.Arrays.asList(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions) } @Test @@ -571,7 +590,7 @@ class ProducerStateManagerTest { assertEquals(1, loadedEntry.get.firstSeq) assertEquals(1, loadedEntry.get.lastDataOffset) assertEquals(1, loadedEntry.get.lastSeq) - assertEquals(Some(0), loadedEntry.get.currentTxnFirstOffset) + assertEquals(OptionalLong.of(0), loadedEntry.get.currentTxnFirstOffset) // entry added after recovery append(recoveredMapping, producerId, epoch, 2, 2L, isTransactional = true) @@ -595,7 +614,7 @@ class ProducerStateManagerTest { assertEquals(1, loadedEntry.get.firstSeq) assertEquals(1, loadedEntry.get.lastDataOffset) assertEquals(1, loadedEntry.get.lastSeq) - assertEquals(None, loadedEntry.get.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), loadedEntry.get.currentTxnFirstOffset) } @Test @@ -613,7 +632,7 @@ class ProducerStateManagerTest { val lastEntry = recoveredMapping.lastEntry(producerId) assertTrue(lastEntry.isDefined) assertEquals(appendTimestamp, lastEntry.get.lastTimestamp) - assertEquals(None, lastEntry.get.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), lastEntry.get.currentTxnFirstOffset) } @Test @@ -623,7 +642,7 @@ class ProducerStateManagerTest { appendEndTxnMarker(stateManager, producerId, (epoch + 1).toShort, ControlRecordType.ABORT, offset = 1L) val lastEntry = stateManager.lastEntry(producerId).get - assertEquals(None, lastEntry.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), lastEntry.currentTxnFirstOffset) assertEquals(-1, lastEntry.lastDataOffset) assertEquals(-1, lastEntry.firstDataOffset) @@ -992,7 +1011,7 @@ class ProducerStateManagerTest { // Appending the empty control batch should not throw and a new transaction shouldn't be started append(stateManager, producerId, baseOffset, batch, origin = AppendOrigin.CLIENT) - assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset) + assertEquals(OptionalLong.empty(), stateManager.lastEntry(producerId).get.currentTxnFirstOffset) } @Test @@ -1101,7 +1120,7 @@ class ProducerStateManagerTest { timestamp: Long = time.milliseconds()): Option[CompletedTxn] = { val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = AppendOrigin.COORDINATOR) val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) - val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) + val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp).asScala mapping.update(producerAppendInfo) completedTxnOpt.foreach(mapping.completeTxn) mapping.updateMapEndOffset(offset + 1) @@ -1129,7 +1148,7 @@ class ProducerStateManagerTest { batch: RecordBatch, origin: AppendOrigin): Unit = { val producerAppendInfo = stateManager.prepareUpdate(producerId, origin) - producerAppendInfo.append(batch, firstOffsetMetadataOpt = None) + producerAppendInfo.append(batch, Optional.empty()) stateManager.update(producerAppendInfo) stateManager.updateMapEndOffset(offset + 1) } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java new file mode 100644 index 00000000000..668456c3518 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.DefaultRecordBatch; + +public class BatchMetadata { + + public final int lastSeq; + public final long lastOffset; + public final int offsetDelta; + public final long timestamp; + + public BatchMetadata( + int lastSeq, + long lastOffset, + int offsetDelta, + long timestamp) { + this.lastSeq = lastSeq; + this.lastOffset = lastOffset; + this.offsetDelta = offsetDelta; + this.timestamp = timestamp; + } + + public int firstSeq() { + return DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta); + } + + public long firstOffset() { + return lastOffset - offsetDelta; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BatchMetadata that = (BatchMetadata) o; + + return lastSeq == that.lastSeq && + lastOffset == that.lastOffset && + offsetDelta == that.offsetDelta && + timestamp == that.timestamp; + } + + @Override + public int hashCode() { + int result = lastSeq; + result = 31 * result + Long.hashCode(lastOffset); + result = 31 * result + offsetDelta; + result = 31 * result + Long.hashCode(timestamp); + return result; + } + + @Override + public String toString() { + return "BatchMetadata(" + + "firstSeq=" + firstSeq() + + ", lastSeq=" + lastSeq + + ", firstOffset=" + firstOffset() + + ", lastOffset=" + lastOffset + + ", timestamp=" + timestamp + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java new file mode 100644 index 00000000000..78568da7897 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +/** + * The last written record for a given producer. The last data offset may be undefined + * if the only log entry for a producer is a transaction marker. + */ +public final class LastRecord { + public final OptionalLong lastDataOffset; + public final short producerEpoch; + + public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { + Objects.requireNonNull(lastDataOffset, "lastDataOffset must be non null"); + this.lastDataOffset = lastDataOffset; + this.producerEpoch = producerEpoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LastRecord that = (LastRecord) o; + + return producerEpoch == that.producerEpoch && + lastDataOffset.equals(that.lastDataOffset); + } + + @Override + public int hashCode() { + return 31 * lastDataOffset.hashCode() + producerEpoch; + } + + @Override + public String toString() { + return "LastRecord(" + + "lastDataOffset=" + lastDataOffset + + ", producerEpoch=" + producerEpoch + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java new file mode 100644 index 00000000000..90329a45cef --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + private final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * Creates a new instance with the provided parameters. + * + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty()); + } + + public long producerId() { + return producerId; + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition + + " is " + producerEpoch + ", " + "which is smaller than the last seen epoch " + updatedEntry.producerEpoch(); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); + } + } + } else { + int currentLastSeq; + if (!updatedEntry.isEmpty()) + currentLastSeq = updatedEntry.lastSeq(); + else if (producerEpoch == currentEntry.producerEpoch()) + currentLastSeq = currentEntry.lastSeq(); + else + currentLastSeq = RecordBatch.NO_SEQUENCE; + + // If there is no current producer epoch (possibly because all producer records have been deleted due to + // retention or the DeleteRecords API) accept writes with any sequence number + if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); + } + } + } + + private boolean inSequence(int lastSeq, int nextSeq) { + return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE); + } + + public Optional<CompletedTxn> append(RecordBatch batch, Optional<LogOffsetMetadata> firstOffsetMetadataOpt) { + if (batch.isControlBatch()) { + Iterator<Record> recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record); + return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp()); + } else { + // An empty control batch means the entire transaction has been cleaned from the log, so no need to append + return Optional.empty(); + } + } else { + LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset())); + appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(), + firstOffsetMetadata, batch.lastOffset(), batch.isTransactional()); + return Optional.empty(); + } + } + + public void appendDataBatch(short epoch, + int firstSeq, + int lastSeq, + long lastTimestamp, + LogOffsetMetadata firstOffsetMetadata, + long lastOffset, + boolean isTransactional) { + long firstOffset = firstOffsetMetadata.messageOffset; + maybeValidateDataBatch(epoch, firstSeq, firstOffset); + updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); + + OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset; + if (currentTxnFirstOffset.isPresent() && !isTransactional) { + // Received a non-transactional message while a transaction is active + throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + + "offset " + firstOffsetMetadata + " in partition " + topicPartition); + } else if (!currentTxnFirstOffset.isPresent() && isTransactional) { + // Began a new transaction + updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset); + transactions.add(new TxnMetadata(producerId, firstOffsetMetadata)); + } + } + + private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long offset) { + if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) { + if (origin == AppendOrigin.REPLICATION) { + log.info("Detected invalid coordinator epoch for producerId {} at offset {} in partition {}: {} is older than previously known coordinator epoch {}", + producerId, offset, topicPartition, endTxnMarker.coordinatorEpoch(), updatedEntry.coordinatorEpoch); + } else { + throw new TransactionCoordinatorFencedException("Invalid coordinator epoch for producerId " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + endTxnMarker.coordinatorEpoch() + + " (zombie), " + updatedEntry.coordinatorEpoch + " (current)"); + } + } + } + + public Optional<CompletedTxn> appendEndTxnMarker( + EndTransactionMarker endTxnMarker, + short producerEpoch, + long offset, + long timestamp) { + checkProducerEpoch(producerEpoch, offset); + checkCoordinatorEpoch(endTxnMarker, offset); + + // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker + // without any associated data will not have any impact on the last stable offset + // and would not need to be reflected in the transaction index. + Optional<CompletedTxn> completedTxn = updatedEntry.currentTxnFirstOffset.isPresent() ? + Optional.of(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset.getAsLong(), offset, + endTxnMarker.controlType() == ControlRecordType.ABORT)) + : Optional.empty(); + + updatedEntry.maybeUpdateProducerEpoch(producerEpoch); + updatedEntry.currentTxnFirstOffset = OptionalLong.empty(); + updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch(); + updatedEntry.lastTimestamp = timestamp; + + return completedTxn; + } + + public ProducerStateEntry toEntry() { + return updatedEntry; + } + + public List<TxnMetadata> startedTransactions() { + return Collections.unmodifiableList(transactions); + } + + @Override + public String toString() { + return "ProducerAppendInfo(" + + "producerId=" + producerId + + ", producerEpoch=" + updatedEntry.producerEpoch() + + ", firstSequence=" + updatedEntry.firstSeq() + + ", lastSequence=" + updatedEntry.lastSeq() + + ", currentTxnFirstOffset=" + updatedEntry.currentTxnFirstOffset + + ", coordinatorEpoch=" + updatedEntry.coordinatorEpoch + + ", lastTimestamp=" + updatedEntry.lastTimestamp + + ", startedTransactions=" + transactions + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java new file mode 100644 index 00000000000..bbb3e9f9041 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * This class represents the state of a specific producer-id. + * It contains batchMetadata queue which is ordered such that the batch with the lowest sequence is at the head of the + * queue while the batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { + public static final int NUM_BATCHES_TO_RETAIN = 5; + + public int coordinatorEpoch; + public long lastTimestamp; + public OptionalLong currentTxnFirstOffset; + + private final long producerId; + private final Deque<BatchMetadata> batchMetadata = new ArrayDeque<>(); + private short producerEpoch; + + public static ProducerStateEntry empty(long producerId) { + return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty()); + } + + public ProducerStateEntry(long producerId) { + this(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty()); + } + + public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset, Optional<BatchMetadata> firstBatchMetadata) { + this.producerId = producerId; + this.producerEpoch = producerEpoch; + this.coordinatorEpoch = coordinatorEpoch; + this.lastTimestamp = lastTimestamp; + this.currentTxnFirstOffset = currentTxnFirstOffset; + firstBatchMetadata.ifPresent(batchMetadata::add); + } + + public int firstSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getFirst().firstSeq(); + } + + public int lastSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getLast().lastSeq; + } + + public long firstDataOffset() { + return isEmpty() ? -1L : batchMetadata.getFirst().firstOffset(); + } + + public long lastDataOffset() { + return isEmpty() ? -1L : batchMetadata.getLast().lastOffset; + } + + public int lastOffsetDelta() { + return isEmpty() ? 0 : batchMetadata.getLast().offsetDelta; + } + + public boolean isEmpty() { + return batchMetadata.isEmpty(); + } + + public void addBatch(short producerEpoch, int lastSeq, long lastOffset, int offsetDelta, long timestamp) { + maybeUpdateProducerEpoch(producerEpoch); + addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp)); + this.lastTimestamp = timestamp; + } + + public boolean maybeUpdateProducerEpoch(short producerEpoch) { + if (this.producerEpoch != producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + private void addBatchMetadata(BatchMetadata batch) { + if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.removeFirst(); + batchMetadata.add(batch); + } + + public void update(ProducerStateEntry nextEntry) { + maybeUpdateProducerEpoch(nextEntry.producerEpoch); + while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.removeFirst()); + this.coordinatorEpoch = nextEntry.coordinatorEpoch; + this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset; + this.lastTimestamp = nextEntry.lastTimestamp; + } + + public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) { + if (batch.producerEpoch() != producerEpoch) return Optional.empty(); + else return batchWithSequenceRange(batch.baseSequence(), batch.lastSequence()); + } + + // Return the batch metadata of the cached batch having the exact sequence range, if any. + Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int lastSeq) { + Stream<BatchMetadata> duplicate = batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() && lastSeq == metadata.lastSeq); + return duplicate.findFirst(); + } + + public Collection<BatchMetadata> batchMetadata() { + return Collections.unmodifiableCollection(batchMetadata); + } + + public short producerEpoch() { + return producerEpoch; + } + + public long producerId() { + return producerId; + } + + @Override + public String toString() { + return "ProducerStateEntry(" + + "producerId=" + producerId + + ", producerEpoch=" + producerEpoch + + ", currentTxnFirstOffset=" + currentTxnFirstOffset + + ", coordinatorEpoch=" + coordinatorEpoch + + ", lastTimestamp=" + lastTimestamp + + ", batchMetadata=" + batchMetadata + + ')'; + } +} \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java new file mode 100644 index 00000000000..76fcb4c528f --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { + public final long producerId; + public final LogOffsetMetadata firstOffset; + public OptionalLong lastOffset; + + public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { + Objects.requireNonNull(firstOffset, "firstOffset must be non null"); + this.producerId = producerId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + public TxnMetadata(long producerId, long firstOffset) { + this(producerId, new LogOffsetMetadata(firstOffset)); + } + + public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { + this(producerId, firstOffset, OptionalLong.empty()); + } + + @Override + public String toString() { + return "TxnMetadata(" + + "producerId=" + producerId + + ", firstOffset=" + firstOffset + + ", lastOffset=" + lastOffset + + ')'; + } +}