ijuma commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r822653150
##########
File path:
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long
nowMs) {
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry :
this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
+
+ final ProducerBatch batch;
+ final long waitedTimeMs;
+ final boolean backingOff;
+ final boolean full;
+
+ // Collect as little as possible inside critical region, determine
outcome after release
synchronized (deque) {
- // When producing to a large number of partitions, this path
is hot and deques are often empty.
- // We check whether a batch exists first to avoid the more
expensive checks whenever possible.
- ProducerBatch batch = deque.peekFirst();
- if (batch != null) {
- TopicPartition part = entry.getKey();
- Node leader = cluster.leaderFor(part);
- if (leader == null) {
- // This is a partition for which leader is not known,
but messages are available to send.
- // Note that entries are currently not removed from
batches when deque is empty.
- unknownLeaderTopics.add(part.topic());
- } else if (!readyNodes.contains(leader) && !isMuted(part))
{
- long waitedTimeMs = batch.waitedTimeMs(nowMs);
- boolean backingOff = batch.attempts() > 0 &&
waitedTimeMs < retryBackoffMs;
- long timeToWaitMs = backingOff ? retryBackoffMs :
lingerMs;
- boolean full = deque.size() > 1 || batch.isFull();
- boolean expired = waitedTimeMs >= timeToWaitMs;
- boolean transactionCompleting = transactionManager !=
null && transactionManager.isCompleting();
- boolean sendable = full
- || expired
- || exhausted
- || closed
- || flushInProgress()
- || transactionCompleting;
- if (sendable && !backingOff) {
- readyNodes.add(leader);
- } else {
- long timeLeftMs = Math.max(timeToWaitMs -
waitedTimeMs, 0);
- // Note that this results in a conservative
estimate since an un-sendable partition may have
- // a leader that will later be found to have
sendable data. However, this is good enough
- // since we'll just wake up and then sleep again
for the remaining time.
- nextReadyCheckDelayMs = Math.min(timeLeftMs,
nextReadyCheckDelayMs);
- }
- }
+ batch = deque.peekFirst();
Review comment:
Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]