Repository: kafka Updated Branches: refs/heads/0.11.0 ab554caee -> 3c96f9128
KAFKA-6003; Accept appends on replicas and when rebuilding the log unconditionally This is a port of #4004 for the 0.11.0 branch. With this patch so that we _only_ validate appends which originate from the client. In general, once the append is validated and written to the leader the first time, revalidating it is undesirable since we can't do anything if validation fails, and also because it is hard to maintain the correct assumptions during validation, leading to spurious validation failures. For example, when we have compacted topics, it is possible for batches to be compacted on the follower but not on the leader. This case would also lead to an OutOfOrderSequencException during replication. The same applies to when we rebuild state from compacted topics: we would get gaps in the sequence numbers, causing the OutOfOrderSequence. Author: Apurva Mehta <apu...@confluent.io> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #4020 from apurvam/KAKFA-6003-0.11.0-handle-unknown-producer-on-replica Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c96f912 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c96f912 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c96f912 Branch: refs/heads/0.11.0 Commit: 3c96f9128af8ddc5d50c64af380827f7a0038287 Parents: ab554ca Author: Apurva Mehta <apu...@confluent.io> Authored: Mon Oct 9 10:34:43 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Oct 9 10:34:43 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 8 +-- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- .../scala/kafka/log/ProducerStateManager.scala | 25 ++++---- .../scala/unit/kafka/log/LogSegmentTest.scala | 2 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 15 +++-- .../kafka/log/ProducerStateManagerTest.scala | 62 +++++++++++++++++--- 6 files changed, 84 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f8b5d82..170e9cb 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -501,7 +501,7 @@ class Log(@volatile var dir: File, val completedTxns = ListBuffer.empty[CompletedTxn] records.batches.asScala.foreach { batch => if (batch.hasProducerId) { - val maybeCompletedTxn = updateProducers(batch, loadedProducers, loadingFromLog = true) + val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false) maybeCompletedTxn.foreach(completedTxns += _) } } @@ -762,7 +762,7 @@ class Log(@volatile var dir: File, if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch))) return (updatedProducers, completedTxns.toList, maybeLastEntry) - val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false) + val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient) maybeCompletedTxn.foreach(completedTxns += _) } (updatedProducers, completedTxns.toList, None) @@ -849,9 +849,9 @@ class Log(@volatile var dir: File, private def updateProducers(batch: RecordBatch, producers: mutable.Map[Long, ProducerAppendInfo], - loadingFromLog: Boolean): Option[CompletedTxn] = { + isFromClient: Boolean): Option[CompletedTxn] = { val producerId = batch.producerId - val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, loadingFromLog)) + val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient)) appendInfo.append(batch) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4e0834c..53eafa1 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -147,7 +147,7 @@ class LogSegment(val log: FileRecords, private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = { if (batch.hasProducerId) { val producerId = batch.producerId - val appendInfo = producerStateManager.prepareUpdate(producerId, loadingFromLog = true) + val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false) val maybeCompletedTxn = appendInfo.append(batch) producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 7a1962a..24530da 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -91,16 +91,17 @@ private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, * of this is the consumer offsets topic which uses producer ids from incoming * TxnOffsetCommit, but has no sequence number to validate and does not depend * on the deduplication which sequence numbers provide. - * @param loadingFromLog This parameter indicates whether the new append is being loaded directly from the log. - * This is used to repopulate producer state when the broker is initialized. The only - * difference in behavior is that we do not validate the sequence number of the first append - * since we may have lost previous sequence numbers when segments were removed due to log - * retention enforcement. + * @param isFromClient The parameter indicates whether the write is coming from a client or not. If it is not coming + * from a client, it could be due to replication traffic, or when rebuilding producer state on + * from the log. In the latter two cases, we should not validate the append, but accept the + * incoming append unconditionally. This is for two reasons: first, the write was already + * validated when received from the client. Second, the data is already the log, so it is not + * clear what would be achieved by validating it again. */ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry, validateSequenceNumbers: Boolean, - loadingFromLog: Boolean) { + isFromClient: Boolean) { private var producerEpoch = initialEntry.producerEpoch private var firstSeq = initialEntry.firstSeq private var lastSeq = initialEntry.lastSeq @@ -108,6 +109,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, private var maxTimestamp = initialEntry.timestamp private var currentTxnFirstOffset = initialEntry.currentTxnFirstOffset private var coordinatorEpoch = initialEntry.coordinatorEpoch + private val transactions = ListBuffer.empty[TxnMetadata] private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int) = { @@ -161,9 +163,10 @@ private[log] class ProducerAppendInfo(val producerId: Long, lastTimestamp: Long, lastOffset: Long, isTransactional: Boolean): Unit = { - if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog) - // skip validation if this is the first entry when loading from the log. Log retention - // will generally have removed the beginning entries from each producer id + if (isFromClient) + // We should only validate appends coming from the client. In particular, this means that we don't validate + // appends for sequence numbers and epochs when building producer state from the log or for writes on a replica. + // So validation only happens on the first write from the client to the partition leader. validateAppend(epoch, firstSeq, lastSeq) this.producerEpoch = epoch @@ -507,9 +510,9 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } - def prepareUpdate(producerId: Long, loadingFromLog: Boolean): ProducerAppendInfo = + def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo = new ProducerAppendInfo(producerId, lastEntry(producerId).getOrElse(ProducerIdEntry.Empty), validateSequenceNumbers, - loadingFromLog) + isFromClient) /** * Update the mapping with the given append information http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 79fe220..6ef5e19 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -274,7 +274,7 @@ class LogSegmentTest { val segment = createSegment(100) val producerEpoch = 0.toShort val partitionLeaderEpoch = 15 - val sequence = 0 + val sequence = 100 val pid1 = 5L val pid2 = 10L http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 3c80bff..d2d1d6c 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -891,22 +891,27 @@ class LogTest { } } - @Test(expected = classOf[DuplicateSequenceNumberException]) + @Test def testDuplicateAppendToFollower() : Unit = { val log = createLog(1024*1024) val epoch: Short = 0 val pid = 1L val baseSequence = 0 val partitionLeaderEpoch = 0 + // The point of this test is to ensure that validation isn't performed on the follower. // this is a bit contrived. to trigger the duplicate case for a follower append, we have to append // a batch with matching sequence numbers, but valid increasing offsets + assertEquals(0L, log.logEndOffset) log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, CompressionType.NONE, pid, epoch, baseSequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, CompressionType.NONE, pid, epoch, baseSequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) + + // Ensure that even the duplicate sequences are accepted on the follower. + assertEquals(4L, log.logEndOffset) } - @Test(expected = classOf[DuplicateSequenceNumberException]) + @Test def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = { val log = createLog(1024*1024) @@ -950,9 +955,11 @@ class LogTest { val records = MemoryRecords.readableRecords(buffer) records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) + + // Ensure that batches with duplicates are accepted on the follower. + assertEquals(0L, log.logEndOffset) log.appendAsFollower(records) - // Should throw a duplicate sequence exception here. - fail("should have thrown a DuplicateSequenceNumberException.") + assertEquals(5L, log.logEndOffset) } @Test(expected = classOf[ProducerFencedException]) http://git-wip-us.apache.org/repos/asf/kafka/blob/3c96f912/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 9a324aa..3c7c07b 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -86,7 +86,7 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 15.toShort val sequence = Int.MaxValue val offset = 735L - append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, sequence, offset, isFromClient = false) append(stateManager, producerId, epoch, 0, offset + 500) @@ -104,7 +104,7 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 15.toShort val sequence = Int.MaxValue val offset = 735L - append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, sequence, offset, isFromClient = false) append(stateManager, producerId, epoch, 1, offset + 500) } @@ -113,7 +113,7 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 5.toShort val sequence = 16 val offset = 735L - append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, sequence, offset, isFromClient = false) val maybeLastEntry = stateManager.lastEntry(producerId) assertTrue(maybeLastEntry.isDefined) @@ -159,7 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite { val offset = 992342L val seq = 0 val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true, - loadingFromLog = false) + isFromClient = true) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L, @@ -176,7 +176,7 @@ class ProducerStateManagerTest extends JUnitSuite { val offset = 992342L val seq = 0 val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers = true, - loadingFromLog = false) + isFromClient = true) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) // use some other offset to simulate a follower append where the log offset metadata won't typically @@ -196,7 +196,7 @@ class ProducerStateManagerTest extends JUnitSuite { val offset = 9L append(stateManager, producerId, producerEpoch, 0, offset) - val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false) + val appendInfo = stateManager.prepareUpdate(producerId, isFromClient = true) appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true) var lastEntry = appendInfo.lastEntry assertEquals(producerEpoch, lastEntry.producerEpoch) @@ -319,6 +319,50 @@ class ProducerStateManagerTest extends JUnitSuite { } @Test + def testAcceptAppendWithoutProducerStateOnReplica(): Unit = { + val epoch = 0.toShort + append(stateManager, producerId, epoch, 0, 0L, 0) + append(stateManager, producerId, epoch, 1, 1L, 1) + + stateManager.takeSnapshot() + val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(0L, 1L, 70000) + + val sequence = 2 + // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Nonetheless + // the append on a replica should be accepted with the local producer state updated to the appended value. + assertFalse(recoveredMapping.activeProducers.contains(producerId)) + append(recoveredMapping, producerId, epoch, sequence, 2L, 70001, isFromClient = false) + assertTrue(recoveredMapping.activeProducers.contains(producerId)) + val producerIdEntry = recoveredMapping.activeProducers.get(producerId).head + assertEquals(epoch, producerIdEntry.producerEpoch) + assertEquals(sequence, producerIdEntry.firstSeq) + assertEquals(sequence, producerIdEntry.lastSeq) + } + + @Test + def testAcceptAppendWithSequenceGapsOnReplica(): Unit = { + val epoch = 0.toShort + append(stateManager, producerId, epoch, 0, 0L, 0) + val outOfOrderSequence = 3 + + // First we ensure that we raise an OutOfOrderSequenceException is raised when the append comes from a client. + try { + append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = true) + fail("Expected an OutOfOrderSequenceException to be raised.") + } catch { + case _ : OutOfOrderSequenceException => + // Good! + case _ : Exception => + fail("Expected an OutOfOrderSequenceException to be raised.") + } + + assertEquals(0L, stateManager.activeProducers(producerId).lastSeq) + append(stateManager, producerId, epoch, outOfOrderSequence, 1L, 1, isFromClient = false) + assertEquals(outOfOrderSequence, stateManager.activeProducers(producerId).lastSeq) + } + + @Test def testDeleteSnapshotsBefore(): Unit = { val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L) @@ -673,7 +717,7 @@ class ProducerStateManagerTest extends JUnitSuite { offset: Long, coordinatorEpoch: Int = 0, timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = { - val producerAppendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false) + val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient = true) val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) mapping.update(producerAppendInfo) @@ -689,8 +733,8 @@ class ProducerStateManagerTest extends JUnitSuite { offset: Long, timestamp: Long = time.milliseconds(), isTransactional: Boolean = false, - isLoadingFromLog: Boolean = false): Unit = { - val producerAppendInfo = stateManager.prepareUpdate(producerId, isLoadingFromLog) + isFromClient : Boolean = true): Unit = { + val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient) producerAppendInfo.append(producerEpoch, seq, seq, timestamp, offset, isTransactional) stateManager.update(producerAppendInfo) stateManager.updateMapEndOffset(offset + 1)