showuon commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r800338273
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ########## @@ -578,41 +594,46 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; + } - boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); - ProducerIdAndEpoch producerIdAndEpoch = - transactionManager != null ? transactionManager.producerIdAndEpoch() : null; - ProducerBatch batch = deque.pollFirst(); - if (producerIdAndEpoch != null && !batch.hasSequence()) { - // If the producer id/epoch of the partition do not match the latest one - // of the producer, we update it and reset the sequence. This should be - // only done when all its in-flight batches have completed. This is guarantee - // in `shouldStopDrainBatchesForPartition`. - transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); - - // If the batch already has an assigned sequence, then we should not change the producer id and - // sequence number, since this may introduce duplicates. In particular, the previous attempt - // may actually have been accepted, and if we change the producer id and sequence here, this - // attempt will also be accepted, causing a duplicate. - // - // Additionally, we update the next sequence number bound for the partition, and also have - // the transaction manager track the batch so as to ensure that sequence ordering is maintained - // even if we receive out of order responses. - batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); - log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + - "{} being sent to partition {}", producerIdAndEpoch.producerId, - producerIdAndEpoch.epoch, batch.baseSequence(), tp); - - transactionManager.addInFlightBatch(batch); - } - batch.close(); - size += batch.records().sizeInBytes(); - ready.add(batch); + batch = deque.pollFirst(); + + boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); + ProducerIdAndEpoch producerIdAndEpoch = + transactionManager != null ? transactionManager.producerIdAndEpoch() : null; + if (producerIdAndEpoch != null && !batch.hasSequence()) { + // If the producer id/epoch of the partition do not match the latest one + // of the producer, we update it and reset the sequence. This should be + // only done when all its in-flight batches have completed. This is guarantee + // in `shouldStopDrainBatchesForPartition`. + transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition); + + // If the batch already has an assigned sequence, then we should not change the producer id and + // sequence number, since this may introduce duplicates. In particular, the previous attempt + // may actually have been accepted, and if we change the producer id and sequence here, this + // attempt will also be accepted, causing a duplicate. + // + // Additionally, we update the next sequence number bound for the partition, and also have + // the transaction manager track the batch so as to ensure that sequence ordering is maintained + // even if we receive out of order responses. + batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); + log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + + "{} being sent to partition {}", producerIdAndEpoch.producerId, + producerIdAndEpoch.epoch, batch.baseSequence(), tp); - batch.drained(now); + transactionManager.addInFlightBatch(batch); } } + + // the rest of the work by processing outside the lock + // close() is particularly expensive + + batch.close(); Review comment: Looks like it won't happen since we only lock on deque object, but just want to confirm, to make sure it won't break anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org