artemlivshits commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r798988451



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
         boolean exhausted = this.free.queued() > 0;
         for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : 
this.batches.entrySet()) {
             Deque<ProducerBatch> deque = entry.getValue();
+
+            final ProducerBatch batch;
+            final long waitedTimeMs;
+            final boolean backingOff;
+            final boolean full;
+
+            // Collect as little as possible inside critical region, determine 
outcome after release
             synchronized (deque) {
-                // When producing to a large number of partitions, this path 
is hot and deques are often empty.
-                // We check whether a batch exists first to avoid the more 
expensive checks whenever possible.
-                ProducerBatch batch = deque.peekFirst();
-                if (batch != null) {
-                    TopicPartition part = entry.getKey();
-                    Node leader = cluster.leaderFor(part);
-                    if (leader == null) {
-                        // This is a partition for which leader is not known, 
but messages are available to send.
-                        // Note that entries are currently not removed from 
batches when deque is empty.
-                        unknownLeaderTopics.add(part.topic());
-                    } else if (!readyNodes.contains(leader) && !isMuted(part)) 
{
-                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
-                        boolean backingOff = batch.attempts() > 0 && 
waitedTimeMs < retryBackoffMs;
-                        long timeToWaitMs = backingOff ? retryBackoffMs : 
lingerMs;
-                        boolean full = deque.size() > 1 || batch.isFull();
-                        boolean expired = waitedTimeMs >= timeToWaitMs;
-                        boolean transactionCompleting = transactionManager != 
null && transactionManager.isCompleting();
-                        boolean sendable = full
-                            || expired
-                            || exhausted
-                            || closed
-                            || flushInProgress()
-                            || transactionCompleting;
-                        if (sendable && !backingOff) {
-                            readyNodes.add(leader);
-                        } else {
-                            long timeLeftMs = Math.max(timeToWaitMs - 
waitedTimeMs, 0);
-                            // Note that this results in a conservative 
estimate since an un-sendable partition may have
-                            // a leader that will later be found to have 
sendable data. However, this is good enough
-                            // since we'll just wake up and then sleep again 
for the remaining time.
-                            nextReadyCheckDelayMs = Math.min(timeLeftMs, 
nextReadyCheckDelayMs);
-                        }
-                    }
+                batch = deque.peekFirst();

Review comment:
       Changes in the `ready` function LGTM

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -578,41 +587,45 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
                 } else {
                     if (shouldStopDrainBatchesForPartition(first, tp))
                         break;
+                }
 
-                    boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-                    ProducerIdAndEpoch producerIdAndEpoch =
-                        transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-                    ProducerBatch batch = deque.pollFirst();
-                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
-                        // If the producer id/epoch of the partition do not 
match the latest one
-                        // of the producer, we update it and reset the 
sequence. This should be
-                        // only done when all its in-flight batches have 
completed. This is guarantee
-                        // in `shouldStopDrainBatchesForPartition`.
-                        
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-                        // If the batch already has an assigned sequence, then 
we should not change the producer id and
-                        // sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-                        // may actually have been accepted, and if we change 
the producer id and sequence here, this
-                        // attempt will also be accepted, causing a duplicate.
-                        //
-                        // Additionally, we update the next sequence number 
bound for the partition, and also have
-                        // the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-                        // even if we receive out of order responses.
-                        batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-                        
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-                        log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-                                "{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-                            producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-                        transactionManager.addInFlightBatch(batch);
-                    }
-                    batch.close();
-                    size += batch.records().sizeInBytes();
-                    ready.add(batch);
+                // do the rest of the work by processing outside the lock
+                // close() is particularly expensive
+                batch = deque.pollFirst();
+            }
 
-                    batch.drained(now);
-                }
+            boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+            ProducerIdAndEpoch producerIdAndEpoch =
+                transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+            if (producerIdAndEpoch != null && !batch.hasSequence()) {
+                // If the producer id/epoch of the partition do not match the 
latest one
+                // of the producer, we update it and reset the sequence. This 
should be
+                // only done when all its in-flight batches have completed. 
This is guarantee
+                // in `shouldStopDrainBatchesForPartition`.
+                
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+                // If the batch already has an assigned sequence, then we 
should not change the producer id and
+                // sequence number, since this may introduce duplicates. In 
particular, the previous attempt
+                // may actually have been accepted, and if we change the 
producer id and sequence here, this
+                // attempt will also be accepted, causing a duplicate.
+                //
+                // Additionally, we update the next sequence number bound for 
the partition, and also have
+                // the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+                // even if we receive out of order responses.
+                batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+                
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);

Review comment:
       I think the expectation is that these 2 would be atomic (i.e. would be 
bad if one thread executed 615, then another thread executed 615 again and got 
the same sequence number, before the first thread got a chance to execute 616).
   Also I think the expectation is that batches that are ordered one after 
another in the queue would get the sequence numbers in the same order (i.e. 
that batch that is later in the queue would get higher sequence number).
   Previously these expectations were protected by the queue lock so "poll", 
"get sequence", "update sequence" would execute as atomic block, with this 
change the operations could interleave.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to