jasonk000 commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r801259547



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -578,41 +594,46 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
                 } else {
                     if (shouldStopDrainBatchesForPartition(first, tp))
                         break;
+                }
 
-                    boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-                    ProducerIdAndEpoch producerIdAndEpoch =
-                        transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-                    ProducerBatch batch = deque.pollFirst();
-                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
-                        // If the producer id/epoch of the partition do not 
match the latest one
-                        // of the producer, we update it and reset the 
sequence. This should be
-                        // only done when all its in-flight batches have 
completed. This is guarantee
-                        // in `shouldStopDrainBatchesForPartition`.
-                        
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-                        // If the batch already has an assigned sequence, then 
we should not change the producer id and
-                        // sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-                        // may actually have been accepted, and if we change 
the producer id and sequence here, this
-                        // attempt will also be accepted, causing a duplicate.
-                        //
-                        // Additionally, we update the next sequence number 
bound for the partition, and also have
-                        // the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-                        // even if we receive out of order responses.
-                        batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-                        
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-                        log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-                                "{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-                            producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-                        transactionManager.addInFlightBatch(batch);
-                    }
-                    batch.close();
-                    size += batch.records().sizeInBytes();
-                    ready.add(batch);
+                batch = deque.pollFirst();
+
+                boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+                ProducerIdAndEpoch producerIdAndEpoch =
+                    transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+                if (producerIdAndEpoch != null && !batch.hasSequence()) {
+                    // If the producer id/epoch of the partition do not match 
the latest one
+                    // of the producer, we update it and reset the sequence. 
This should be
+                    // only done when all its in-flight batches have 
completed. This is guarantee
+                    // in `shouldStopDrainBatchesForPartition`.
+                    
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+                    // If the batch already has an assigned sequence, then we 
should not change the producer id and
+                    // sequence number, since this may introduce duplicates. 
In particular, the previous attempt
+                    // may actually have been accepted, and if we change the 
producer id and sequence here, this
+                    // attempt will also be accepted, causing a duplicate.
+                    //
+                    // Additionally, we update the next sequence number bound 
for the partition, and also have
+                    // the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+                    // even if we receive out of order responses.
+                    batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+                    
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
+                    log.debug("Assigned producerId {} and producerEpoch {} to 
batch with base sequence " +
+                            "{} being sent to partition {}", 
producerIdAndEpoch.producerId,
+                        producerIdAndEpoch.epoch, batch.baseSequence(), tp);
 
-                    batch.drained(now);
+                    transactionManager.addInFlightBatch(batch);
                 }
             }
+
+            // the rest of the work by processing outside the lock
+            // close() is particularly expensive
+
+            batch.close();

Review comment:
       Since we lock on deque, the batch is either "in" the deque and 
append/etc will work on it in synchronized block, or it is removed from deque, 
and will no longer be available for other threads to do any work on it. From my 
reading, Sender only collects batches via the drain and expiry path, both of 
which acquire the lock before removing the element from the deque. This implies 
that the removal from batch is atomic, whether that is drained or expired out, 
it can only be one. The Sender re-enqueue works on batches that were previously 
drained as well.  Nothing stands out at me as a possible area for leakage but 
I'm happy to be corrected and more eyes is always better for this sort of thing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to