[ https://issues.apache.org/jira/browse/KAFKA-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586140#comment-16586140 ]
ASF GitHub Bot commented on KAFKA-7298: --------------------------------------- hachikuji closed pull request #5518: KAFKA-7298; Raise UnknownProducerIdException if next sequence number is unknown URL: https://github.com/apache/kafka/pull/5518 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 49c887b771c..2f711234bdb 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -234,9 +234,13 @@ private[log] class ProducerAppendInfo(val producerId: Long, RecordBatch.NO_SEQUENCE if (currentLastSeq == RecordBatch.NO_SEQUENCE && appendFirstSeq != 0) { - // the epoch was bumped by a control record, so we expect the sequence number to be reset - throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $appendFirstSeq " + - s"(incoming seq. number), but expected 0") + // We have a matching epoch, but we do not know the next sequence number. This case can happen if + // only a transaction marker is left in the log for this producer. We treat this as an unknown + // producer id error, so that the producer can check the log start offset for truncation and reset + // the sequence number. Note that this check follows the fencing check, so the marker still fences + // old producers even if it cannot determine our next expected sequence number. + throw new UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch " + + s"for producerId=$producerId, but next expected sequence number is not known.") } else if (!inSequence(currentLastSeq, appendFirstSeq)) { throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq " + s"(incoming seq. number), $currentLastSeq (current end sequence number)") diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 053aed7c915..f9f4a239023 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -81,6 +81,35 @@ class ProducerStateManagerTest extends JUnitSuite { } } + @Test + def testAppendTxnMarkerWithNoProducerState(): Unit = { + val producerEpoch = 2.toShort + appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 27L) + + val firstEntry = stateManager.lastEntry(producerId).getOrElse(fail("Expected last entry to be defined")) + assertEquals(producerEpoch, firstEntry.producerEpoch) + assertEquals(producerId, firstEntry.producerId) + assertEquals(RecordBatch.NO_SEQUENCE, firstEntry.lastSeq) + + // Fencing should continue to work even if the marker is the only thing left + assertThrows[ProducerFencedException] { + append(stateManager, producerId, 0.toShort, 0, 0L, 4L) + } + + // If the transaction marker is the only thing left in the log, then an attempt to write using a + // non-zero sequence number should cause an UnknownProducerId, so that the producer can reset its state + assertThrows[UnknownProducerIdException] { + append(stateManager, producerId, producerEpoch, 17, 0L, 4L) + } + + // The broker should accept the request if the sequence number is reset to 0 + append(stateManager, producerId, producerEpoch, 0, 39L, 4L) + val secondEntry = stateManager.lastEntry(producerId).getOrElse(fail("Expected last entry to be defined")) + assertEquals(producerEpoch, secondEntry.producerEpoch) + assertEquals(producerId, secondEntry.producerId) + assertEquals(0, secondEntry.lastSeq) + } + @Test def testProducerSequenceWrapAround(): Unit = { val epoch = 15.toShort ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Concurrent DeleteRecords can lead to fatal OutOfSequence error in producer > -------------------------------------------------------------------------- > > Key: KAFKA-7298 > URL: https://issues.apache.org/jira/browse/KAFKA-7298 > Project: Kafka > Issue Type: Bug > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > > We have logic in the producer to handle unknown producer errors. Basically > when the producer gets an unknown producer error, it checks whether the log > start offset is larger than the last acknowledged offset. If it is, then we > know the error is spurious and we reset the sequence number to 0, which the > broker will then accept. > It can happen after a DeleteRecords call, however, that the only record > remaining in the log is a transaction marker, which does not have a sequence > number. The error we get in this case is OUT_OF_SEQUENCE rather than > UNKNOWN_PRODUCER, which is fatal. -- This message was sent by Atlassian JIRA (v7.6.3#76005)