jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205777679
########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ########## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } + public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { + if (this.producerEpoch < producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions + // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: + // a) There is no tentative sequence yet + // b) A lower sequence for the same epoch is seen and should thereby block records after that + // c) A higher producer epoch is found that will reset the lowest seen sequence + public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { + if (batchMetadata.isEmpty() && Review Comment: I dug into this a bit more. We actually do check sequence on bumped epoch! ``` if (producerEpoch != updatedEntry.producerEpoch()) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); ``` Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ########## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } + public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { + if (this.producerEpoch < producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions + // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: + // a) There is no tentative sequence yet + // b) A lower sequence for the same epoch is seen and should thereby block records after that + // c) A higher producer epoch is found that will reset the lowest seen sequence + public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { + if (batchMetadata.isEmpty() && Review Comment: I dug into this a bit more. We actually do check sequence on bumped epoch! ``` if (producerEpoch != updatedEntry.producerEpoch()) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); ``` Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org