Repository: kafka Updated Branches: refs/heads/trunk b5b266eee -> 6ea4fffdd
KAFKA-6003; Accept appends on replicas unconditionally when local producer state doesn't exist Without this patch, if the replica's log was somehow truncated before the leader's, it is possible for the replica fetcher thread to continuously throw an OutOfOrderSequenceException because the incoming sequence would be non-zero and there is no local state. This patch changes the behavior so that the replica state is updated to the leader's state if there was no local state for the producer at the time of the append. Author: Apurva Mehta <apu...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #4004 from apurvam/KAFKA-6003-handle-unknown-producer-on-replica Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ea4fffd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ea4fffd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ea4fffd Branch: refs/heads/trunk Commit: 6ea4fffdd287a0c6a02c1b6dc1006b1a7b614405 Parents: b5b266e Author: Apurva Mehta <apu...@confluent.io> Authored: Wed Oct 4 22:27:03 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Oct 4 22:27:03 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 8 +- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- .../scala/kafka/log/ProducerStateManager.scala | 101 ++++++++++++------- .../scala/unit/kafka/log/LogSegmentTest.scala | 5 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 15 ++- .../kafka/log/ProducerStateManagerTest.scala | 64 ++++++++++-- 6 files changed, 134 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index dc47194..d397ca6 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -514,7 +514,7 @@ class Log(@volatile var dir: File, val completedTxns = ListBuffer.empty[CompletedTxn] records.batches.asScala.foreach { batch => if (batch.hasProducerId) { - val maybeCompletedTxn = updateProducers(batch, loadedProducers, loadingFromLog = true) + val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false) maybeCompletedTxn.foreach(completedTxns += _) } } @@ -791,7 +791,7 @@ class Log(@volatile var dir: File, return (updatedProducers, completedTxns.toList, Some(duplicate)) } - val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false) + val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient) maybeCompletedTxn.foreach(completedTxns += _) } (updatedProducers, completedTxns.toList, None) @@ -878,9 +878,9 @@ class Log(@volatile var dir: File, private def updateProducers(batch: RecordBatch, producers: mutable.Map[Long, ProducerAppendInfo], - loadingFromLog: Boolean): Option[CompletedTxn] = { + isFromClient: Boolean): Option[CompletedTxn] = { val producerId = batch.producerId - val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, loadingFromLog)) + val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient)) appendInfo.append(batch) } http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 06c4e2d..845f08f 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -145,7 +145,7 @@ class LogSegment(val log: FileRecords, private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = { if (batch.hasProducerId) { val producerId = batch.producerId - val appendInfo = producerStateManager.prepareUpdate(producerId, loadingFromLog = true) + val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false) val maybeCompletedTxn = appendInfo.append(batch) producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 1cf9a14..81726c1 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -36,6 +36,15 @@ import scala.collection.{immutable, mutable} class CorruptSnapshotException(msg: String) extends KafkaException(msg) + +// ValidationType and its subtypes define the extent of the validation to perform on a given ProducerAppendInfo instance +private[log] sealed trait ValidationType +private[log] object ValidationType { + case object None extends ValidationType + case object EpochOnly extends ValidationType + case object Full extends ValidationType +} + private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) { def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset)) @@ -138,49 +147,58 @@ private[log] class ProducerIdEntry(val producerId: Long, val batchMetadata: muta * the most recent appends made by the producer. Validation of the first incoming append will * be made against the lastest append in the current entry. New appends will replace older appends * in the current entry so that the space overhead is constant. - * @param validateSequenceNumbers Whether or not sequence numbers should be validated. The only current use - * of this is the consumer offsets topic which uses producer ids from incoming - * TxnOffsetCommit, but has no sequence number to validate and does not depend - * on the deduplication which sequence numbers provide. - * @param loadingFromLog This parameter indicates whether the new append is being loaded directly from the log. - * This is used to repopulate producer state when the broker is initialized. The only - * difference in behavior is that we do not validate the sequence number of the first append - * since we may have lost previous sequence numbers when segments were removed due to log - * retention enforcement. + * @param validationType Indicates the extent of validation to perform on the appends on this instance. Offset commits + * coming from the producer should have EpochOnlyValidation. Appends which aren't from a client + * will not be validated at all, and should be set to NoValidation. All other appends should + * have FullValidation. */ private[log] class ProducerAppendInfo(val producerId: Long, currentEntry: ProducerIdEntry, - validateSequenceNumbers: Boolean, - loadingFromLog: Boolean) { + validationType: ValidationType) { private val transactions = ListBuffer.empty[TxnMetadata] - private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = { + private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = { + validationType match { + case ValidationType.None => + + case ValidationType.EpochOnly => + checkEpoch(producerEpoch) + + case ValidationType.Full => + checkEpoch(producerEpoch) + checkSequence(producerEpoch, firstSeq, lastSeq) + } + } + + private def checkEpoch(producerEpoch: Short): Unit = { if (isFenced(producerEpoch)) { throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " + s"with a newer epoch. $producerEpoch (request epoch), ${currentEntry.producerEpoch} (server epoch)") - } else if (validateSequenceNumbers) { - if (producerEpoch != currentEntry.producerEpoch) { - if (firstSeq != 0) { - if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) { - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + - s"(request epoch), $firstSeq (seq. number)") - } else { - throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " + - s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.") - } + } + } + + private def checkSequence(producerEpoch: Short, firstSeq: Int, lastSeq: Int): Unit = { + if (producerEpoch != currentEntry.producerEpoch) { + if (firstSeq != 0) { + if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + + s"(request epoch), $firstSeq (seq. number)") + } else { + throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " + + s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.") } - } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) { - // the epoch was bumped by a control record, so we expect the sequence number to be reset - throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " + - s"(incoming seq. number), but expected 0") - } else if (isDuplicate(firstSeq, lastSeq)) { - throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " + - s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).") - } else if (!inSequence(firstSeq, lastSeq)) { - throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " + - s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)") } + } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) { + // the epoch was bumped by a control record, so we expect the sequence number to be reset + throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " + + s"(incoming seq. number), but expected 0") + } else if (isDuplicate(firstSeq, lastSeq)) { + throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " + + s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).") + } else if (!inSequence(firstSeq, lastSeq)) { + throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " + + s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)") } } @@ -216,10 +234,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, lastTimestamp: Long, lastOffset: Long, isTransactional: Boolean): Unit = { - if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog) - // skip validation if this is the first entry when loading from the log. Log retention - // will generally have removed the beginning entries from each producer id - validateAppend(epoch, firstSeq, lastSeq) + maybeValidateAppend(epoch, firstSeq, lastSeq) currentEntry.addBatchMetadata(epoch, lastSeq, lastOffset, lastSeq - firstSeq, lastTimestamp) @@ -541,9 +556,17 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } - def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo = - new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)), validateSequenceNumbers, - loadingFromLog) + def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo = { + val validationToPerform = + if (!isFromClient) + ValidationType.None + else if (topicPartition.topic == Topic.GROUP_METADATA_TOPIC_NAME) + ValidationType.EpochOnly + else + ValidationType.Full + + new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.empty(producerId)), validationToPerform) + } /** * Update the mapping with the given append information http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 0f866e7..cef2bca 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -274,7 +274,7 @@ class LogSegmentTest { val segment = createSegment(100) val producerEpoch = 0.toShort val partitionLeaderEpoch = 15 - val sequence = 0 + val sequence = 100 val pid1 = 5L val pid2 = 10L @@ -317,7 +317,8 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = new ProducerStateManager(topicPartition, logDir) - stateManager.loadProducerEntry(new ProducerIdEntry(pid2, mutable.Queue[BatchMetadata](BatchMetadata(10, 90L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L))) + stateManager.loadProducerEntry(new ProducerIdEntry(pid2, + mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, 0, Some(75L))) segment.recover(stateManager) assertEquals(108L, stateManager.mapEndOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 2ae62c5..6d40967 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -869,7 +869,7 @@ class LogTest { } } - @Test(expected = classOf[DuplicateSequenceException]) + @Test def testDuplicateAppendToFollower() : Unit = { val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) val log = createLog(logDir, logConfig) @@ -877,15 +877,20 @@ class LogTest { val pid = 1L val baseSequence = 0 val partitionLeaderEpoch = 0 + // The point of this test is to ensure that validation isn't performed on the follower. // this is a bit contrived. to trigger the duplicate case for a follower append, we have to append // a batch with matching sequence numbers, but valid increasing offsets + assertEquals(0L, log.logEndOffset) log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, CompressionType.NONE, pid, epoch, baseSequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, CompressionType.NONE, pid, epoch, baseSequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) + + // Ensure that even the duplicate sequences are accepted on the follower. + assertEquals(4L, log.logEndOffset) } - @Test(expected = classOf[DuplicateSequenceException]) + @Test def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = { val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) val log = createLog(logDir, logConfig) @@ -930,9 +935,11 @@ class LogTest { val records = MemoryRecords.readableRecords(buffer) records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) + + // Ensure that batches with duplicates are accepted on the follower. + assertEquals(0L, log.logEndOffset) log.appendAsFollower(records) - // Should throw a duplicate sequence exception here. - fail("should have thrown a DuplicateSequenceNumberException.") + assertEquals(5L, log.logEndOffset) } @Test(expected = classOf[ProducerFencedException]) http://git-wip-us.apache.org/repos/asf/kafka/blob/6ea4fffd/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 9eb9ae7..8650624 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -86,7 +86,7 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 15.toShort val sequence = Int.MaxValue val offset = 735L - append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, sequence, offset, isFromClient = false) append(stateManager, producerId, epoch, 0, offset + 500) @@ -105,7 +105,7 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 15.toShort val sequence = Int.MaxValue val offset = 735L - append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, sequence, offset, isFromClient = false) append(stateManager, producerId, epoch, 1, offset + 500) } @@ -114,7 +114,7 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 5.toShort val sequence = 16 val offset = 735L - append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, sequence, offset, isFromClient = false) val maybeLastEntry = stateManager.lastEntry(producerId) assertTrue(maybeLastEntry.isDefined) @@ -159,8 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), validateSequenceNumbers = true, - loadingFromLog = false) + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), ValidationType.Full) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L, @@ -176,8 +175,7 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), validateSequenceNumbers = true, - loadingFromLog = false) + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId), ValidationType.Full) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) // use some other offset to simulate a follower append where the log offset metadata won't typically @@ -197,7 +195,7 @@ class ProducerStateManagerTest extends JUnitSuite { val offset = 9L append(stateManager, producerId, producerEpoch, 0, offset) - val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false) + val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true) appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true) var lastEntry = appendInfo.latestEntry assertEquals(producerEpoch, lastEntry.producerEpoch) @@ -321,6 +319,50 @@ class ProducerStateManagerTest extends JUnitSuite { } @Test + def testAcceptAppendWithoutProducerStateOnReplica(): Unit = { + val epoch = 0.toShort + append(stateManager, producerId, epoch, 0, 0L, 0) + append(stateManager, producerId, epoch, 1, 1L, 1) + + stateManager.takeSnapshot() + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(0L, 1L, 70000) + + val sequence = 2 + // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Nonetheless + // the append on a replica should be accepted with the local producer state updated to the appended value. + assertFalse(recoveredMapping.activeProducers.contains(producerId)) + append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, isFromClient = false) + assertTrue(recoveredMapping.activeProducers.contains(producerId)) + val producerIdEntry = recoveredMapping.activeProducers.get(producerId).head + assertEquals(epoch, producerIdEntry.producerEpoch) + assertEquals(sequence, producerIdEntry.firstSeq) + assertEquals(sequence, producerIdEntry.lastSeq) + } + + @Test + def testAcceptAppendWithSequenceGapsOnReplica(): Unit = { + val epoch = 0.toShort + append(stateManager, producerId, epoch, 0, 0L, 0) + val outOfOrderSequence = 3 + + // First we ensure that we raise an OutOfOrderSequenceException is raised when the append comes from a client. + try { + append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = true) + fail("Expected an OutOfOrderSequenceException to be raised.") + } catch { + case _ : OutOfOrderSequenceException => + // Good! + case _ : Exception => + fail("Expected an OutOfOrderSequenceException to be raised.") + } + + assertEquals(0L, stateManager.activeProducers(producerId).lastSeq) + append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = false) + assertEquals(outOfOrderSequence, stateManager.activeProducers(producerId).lastSeq) + } + + @Test def testDeleteSnapshotsBefore(): Unit = { val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L) @@ -675,7 +717,7 @@ class ProducerStateManagerTest extends JUnitSuite { offset: Long, coordinatorEpoch: Int = 0, timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = { - val producerAppendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false) + val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true) val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) mapping.update(producerAppendInfo) @@ -691,8 +733,8 @@ class ProducerStateManagerTest extends JUnitSuite { offset: Long, timestamp: Long = time.milliseconds(), isTransactional: Boolean = false, - isLoadingFromLog: Boolean = false): Unit = { - val producerAppendInfo = stateManager.prepareUpdate(producerId, isLoadingFromLog) + isFromClient : Boolean = true): Unit = { + val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient) producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, isTransactional) stateManager.update(producerAppendInfo) stateManager.updateMapEndOffset(offset + 1)