guozhangwang commented on code in PR #12667:
URL: https://github.com/apache/kafka/pull/12667#discussion_r975630163


##########
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##########
@@ -216,12 +216,12 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 
   private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: 
Long): Unit = {
     if (producerEpoch != updatedEntry.producerEpoch) {
-      if (appendFirstSeq != 0) {
-        if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch of producer $producerId " +
-            s"at offset $offset in partition $topicPartition: $producerEpoch 
(request epoch), $appendFirstSeq (seq. number), " +
-            s"${updatedEntry.producerEpoch} (current producer epoch)")
-        }
+      if (appendFirstSeq != 0 && updatedEntry.producerEpoch != 
RecordBatch.NO_PRODUCER_EPOCH) {

Review Comment:
   Consolidating nested if-s



##########
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##########
@@ -233,10 +233,16 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 
       // If there is no current producer epoch (possibly because all producer 
records have been deleted due to
       // retention or the DeleteRecords API) accept writes with any sequence 
number
-      if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
inSequence(currentLastSeq, appendFirstSeq))) {
+      if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH && 
!inSequence(currentLastSeq, appendFirstSeq)) {

Review Comment:
   Unfolding the conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to