artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r850625357


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -103,7 +103,18 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
      * @return The RecordSend corresponding to this record or null if there 
isn't sufficient room.
      */
     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] 
value, Header[] headers, Callback callback, long now) {
-        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
+        return tryAppend(timestamp, key, value, headers, callback, now, null);
+    }
+
+    /**
+     * Append the record to the current record set and return the relative 
offset within that record set.  An optional
+     * argument recordInfo can be used to return info about the record.
+     *
+     * @return The RecordSend corresponding to this record or null if there 
isn't sufficient room.
+     */
+    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] 
value, Header[] headers,
+                                          Callback callback, long now, 
MemoryRecordsBuilder.RecordInfo recordInfo) {

Review Comment:
   Just added recordInfo here.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -208,43 +290,88 @@ public RecordAppendResult append(TopicPartition tp,
                 return new RecordAppendResult(null, false, false, true);
             }
 
-            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
-            int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
-            log.trace("Allocating a new {} byte message buffer for topic {} 
partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), 
maxTimeToBlock);
-            buffer = free.allocate(size, maxTimeToBlock);
+            RecordAppendResult appendResult = appendNewBatch(topic, partition, 
dq, sizeStats, timestamp, key, value, headers, callbacks, maxTimeToBlock);
+
+            // To properly estimate the amount of bytes produced to a 
partition, we keep track
+            // of batch headers.  The record size estimator would atomically 
extract this value
+            // and account for it in the record byte estimation.
+            if (appendResult.newBatchCreated && countBatchHeader)
+                sizeStats.onNewBatch(apiVersions.maxUsableProduceMagic(), 
compression);
 
+            return appendResult;
+        } finally {
+            appendsInProgress.decrementAndGet();
+        }
+    }
+
+    /**
+     * Append a new batch to the queue
+     *
+     * @param topic The topic
+     * @param partition The partition (cannot be 
RecordMetadata.UNKNOWN_PARTITION)
+     * @param dq The queue
+     * @param sizeStats The size stats
+     * @param timestamp The timestamp of the record
+     * @param key The key for the record
+     * @param value The value for the record
+     * @param headers the Headers for the record
+     * @param callbacks The callbacks to execute
+     * @param maxTimeToBlock The maximum time in milliseconds to block for 
buffer memory to be available
+     */
+    private RecordAppendResult appendNewBatch(String topic,

Review Comment:
   This is mostly just moving part of the logic from append to a separate 
function.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -74,10 +76,13 @@ public class RecordAccumulator {
     private final int lingerMs;
     private final long retryBackoffMs;
     private final int deliveryTimeoutMs;
+    private final long partitionAvailabilityTimeoutMs;  // latency threshold 
for marking partition temporary unavailable
+    private final boolean enableAdaptivePartitioning;
     private final BufferPool free;
     private final Time time;
     private final ApiVersions apiVersions;
-    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
+    private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = 
new CopyOnWriteMap<>();

Review Comment:
   Instead of flat map {tp -> batches}, it's now a map of maps {topic -> 
{partition -> {batches, etc.}}, for a couple reasons:
   1. For queue size statistics calculation is more efficient if partitions are 
grouped by topic.
   2. There are other per-topic data (partitioner info).
   



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -420,38 +552,94 @@ private void insertInSequenceOrder(Deque<ProducerBatch> 
deque, ProducerBatch bat
     }
 
     /**
-     * Get a list of nodes whose partitions are ready to be sent, and the 
earliest time at which any non-sendable
-     * partition will be ready; Also return the flag for whether there are any 
unknown leaders for the accumulated
-     * partition batches.
-     * <p>
-     * A destination node is ready to send data if:
-     * <ol>
-     * <li>There is at least one partition that is not backing off its send
-     * <li><b>and</b> those partitions are not muted (to prevent reordering if
-     *   {@value 
org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
-     *   is set to one)</li>
-     * <li><b>and <i>any</i></b> of the following are true</li>
-     * <ul>
-     *     <li>The record set is full</li>
-     *     <li>The record set has sat in the accumulator for at least lingerMs 
milliseconds</li>
-     *     <li>The accumulator is out of memory and threads are blocking 
waiting for data (in this case all partitions
-     *     are immediately considered ready).</li>
-     *     <li>The accumulator has been closed</li>
-     * </ul>
-     * </ol>
+     * Add the leader to the ready nodes if the batch is ready
+     *
+     * @param nowMs The current time
+     * @param exhausted 'true' is the buffer pool is exhausted
+     * @param part The partition
+     * @param leader The leader for the partition
+     * @param waitedTimeMs How long batch waited
+     * @param backingOff Is backing off
+     * @param full Is batch full
+     * @param nextReadyCheckDelayMs The delay for next check
+     * @param readyNodes The set of ready nodes (to be filled in)
+     * @return The delay for next check
      */
-    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
-        Set<Node> readyNodes = new HashSet<>();
-        long nextReadyCheckDelayMs = Long.MAX_VALUE;
-        Set<String> unknownLeaderTopics = new HashSet<>();
+    private long batchReady(long nowMs, boolean exhausted, TopicPartition 
part, Node leader,

Review Comment:
   This is pretty much the content of the inner loop in `ready` function.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -262,14 +389,18 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer 
buffer, byte maxUsableMag
      *  if it is expired, or when the producer is closed.
      */
     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] 
value, Header[] headers,
-                                         Callback callback, 
Deque<ProducerBatch> deque, long nowMs) {
+                                         Callback callback, 
Deque<ProducerBatch> deque,
+                                         
BuiltInPartitioner.PartitionByteSizeStats sizeStats, long nowMs) {

Review Comment:
   Just a new argument.



##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -540,6 +540,16 @@ private static Header[] readHeaders(ByteBuffer buffer, int 
numHeaders) {
         return headers;
     }
 
+    public static int sizeInBytes(int recordOverhead,

Review Comment:
   Changes here is just refactoring of existing logic.



##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -2046,21 +2047,27 @@ private <T> FutureRecordMetadata expectAppend(
         )).thenReturn(initialSelectedPartition.partition());
 
         when(ctx.accumulator.append(
-            eq(initialSelectedPartition),
-            eq(timestamp),
-            eq(serializedKey),
-            eq(serializedValue),
-            eq(Record.EMPTY_HEADERS),
-            any(Callback.class),
+            eq(initialSelectedPartition.topic()),            // 0
+            eq(initialSelectedPartition.partition()),        // 1
+            eq(timestamp),                                   // 2
+            eq(serializedKey),                               // 3
+            eq(serializedValue),                             // 4
+            eq(Record.EMPTY_HEADERS),                        // 5
+            any(RecordAccumulator.AppendCallbacks.class),    // 6 <--
             anyLong(),
             eq(true),
-            anyLong()
-        )).thenReturn(new RecordAccumulator.RecordAppendResult(
-            futureRecordMetadata,
-            false,
-            false,
-            false
-        ));
+            anyLong(),
+            any()
+        )).thenAnswer(invocation -> {
+            RecordAccumulator.AppendCallbacks callbacks =
+                (RecordAccumulator.AppendCallbacks) 
invocation.getArguments()[6];
+            callbacks.setPartition(initialSelectedPartition.partition());

Review Comment:
   Needed to extend the mock to call the setPartition callback.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -693,8 +693,9 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(metrics), thi
 
         assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
 
-        Future<RecordMetadata> responseFuture1 = accumulator.append(tp0, 
time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS,
-                null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
+        Future<RecordMetadata> responseFuture1 = 
accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),

Review Comment:
   Changes here are just to reflect function signature changes.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1017,6 +1023,153 @@ public void testStickyBatches() throws Exception {
         assertEquals(appends, 2 * expectedAppends);
     }
 
+    @Test
+    public void testUniformBuiltInPartitioner() throws Exception {

Review Comment:
   This and testAdaptiveBuiltInPartitioner are new additions, the rest is just 
tweaks to reflect changes in function signature.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -546,6 +554,76 @@ public void testMetadataTopicExpiry() throws Exception {
         assertTrue(future.isDone(), "Request should be completed");
     }
 
+    @Test
+    public void testNodeLatencyStats() throws Exception {

Review Comment:
   This is a new addition, the rest is just updates to reflect function 
signatures / callback changes.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -463,55 +651,94 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
             synchronized (deque) {
                 // Deques are often empty in this path, esp with large 
partition counts,
                 // so we exit early if we can.
-                batch = deque.peekFirst();
+                ProducerBatch batch = deque.peekFirst();
                 if (batch == null) {
                     continue;
                 }
 
                 waitedTimeMs = batch.waitedTimeMs(nowMs);
                 backingOff = batch.attempts() > 0 && waitedTimeMs < 
retryBackoffMs;
-                full = deque.size() > 1 || batch.isFull();
+                dequeSize = deque.size();
+                full = dequeSize > 1 || batch.isFull();
             }
 
-            TopicPartition part = entry.getKey();
-            Node leader = cluster.leaderFor(part);
             if (leader == null) {
                 // This is a partition for which leader is not known, but 
messages are available to send.
                 // Note that entries are currently not removed from batches 
when deque is empty.
                 unknownLeaderTopics.add(part.topic());
-            } else if (!readyNodes.contains(leader) && !isMuted(part)) {
-                long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
-                boolean expired = waitedTimeMs >= timeToWaitMs;
-                boolean transactionCompleting = transactionManager != null && 
transactionManager.isCompleting();
-                boolean sendable = full
-                    || expired
-                    || exhausted
-                    || closed
-                    || flushInProgress()
-                    || transactionCompleting;
-                if (sendable && !backingOff) {
-                    readyNodes.add(leader);
-                } else {
-                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
-                    // Note that this results in a conservative estimate since 
an un-sendable partition may have
-                    // a leader that will later be found to have sendable 
data. However, this is good enough
-                    // since we'll just wake up and then sleep again for the 
remaining time.
-                    nextReadyCheckDelayMs = Math.min(timeLeftMs, 
nextReadyCheckDelayMs);
+            } else {
+                if (queueSizes != null)
+                    queueSizes[queueSizesIndex] = dequeSize;
+                if (partitionAvailabilityTimeoutMs > 0) {
+                    // Check if we want to exclude the partition from the list 
of available partitions
+                    // if the broker hasn't responded for some time.
+                    NodeLatencyStats nodeLatencyStats = 
nodeStats.get(leader.id());
+                    if (nodeLatencyStats != null) {
+                        // NOTE: there is no synchronization between reading 
metrics,
+                        // so we read ready time first to avoid accidentally 
marking partition
+                        // unavailable if we read while the metrics are being 
updated.
+                        long readyTimeMs = nodeLatencyStats.readyTimeMs;
+                        if (readyTimeMs - nodeLatencyStats.drainTimeMs > 
partitionAvailabilityTimeoutMs)
+                            --queueSizesIndex;
+                    }
                 }
+
+                nextReadyCheckDelayMs = batchReady(nowMs, exhausted, part, 
leader, waitedTimeMs, backingOff,
+                    full, nextReadyCheckDelayMs, readyNodes);
             }
         }
+
+        // We've collected the queue sizes for partitions of this topic, now 
we can calculate
+        // load stats.  NOTE: the stats are calculated in place, modifying the
+        // queueSizes array.
+        topicInfo.builtInPartitioner.updatePartitionLoadStats(queueSizes, 
partitionIds, queueSizesIndex + 1);
+        return nextReadyCheckDelayMs;
+    }
+
+    /**
+     * Get a list of nodes whose partitions are ready to be sent, and the 
earliest time at which any non-sendable
+     * partition will be ready; Also return the flag for whether there are any 
unknown leaders for the accumulated
+     * partition batches.
+     * <p>
+     * A destination node is ready to send data if:
+     * <ol>
+     * <li>There is at least one partition that is not backing off its send
+     * <li><b>and</b> those partitions are not muted (to prevent reordering if
+     *   {@value 
org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
+     *   is set to one)</li>
+     * <li><b>and <i>any</i></b> of the following are true</li>
+     * <ul>
+     *     <li>The record set is full</li>
+     *     <li>The record set has sat in the accumulator for at least lingerMs 
milliseconds</li>
+     *     <li>The accumulator is out of memory and threads are blocking 
waiting for data (in this case all partitions
+     *     are immediately considered ready).</li>
+     *     <li>The accumulator has been closed</li>
+     * </ul>
+     * </ol>
+     */
+    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
+        Set<Node> readyNodes = new HashSet<>();
+        long nextReadyCheckDelayMs = Long.MAX_VALUE;
+        Set<String> unknownLeaderTopics = new HashSet<>();
+        // Go topic by topic so that we can get queue sizes for partitions in 
a topic and calculate
+        // probability weights (used in partitioner).
+        for (Map.Entry<String, TopicInfo> topicInfoEntry : 
this.topicInfoMap.entrySet()) {
+            final String topic = topicInfoEntry.getKey();
+            nextReadyCheckDelayMs = partitionReady(cluster, nowMs, topic, 
topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, 
unknownLeaderTopics);
+        }
         return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, 
unknownLeaderTopics);
     }
 
     /**
      * Check whether there are any batches which haven't been drained
      */
     public boolean hasUndrained() {
-        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : 
this.batches.entrySet()) {
-            Deque<ProducerBatch> deque = entry.getValue();
-            synchronized (deque) {
-                if (!deque.isEmpty())
-                    return true;
+        for (TopicInfo topicInfo : topicInfoMap.values()) {
+            for (Deque<ProducerBatch> deque : topicInfo.batches.values()) {

Review Comment:
   Just change the iteration, because it's map of maps now.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -420,38 +552,94 @@ private void insertInSequenceOrder(Deque<ProducerBatch> 
deque, ProducerBatch bat
     }
 
     /**
-     * Get a list of nodes whose partitions are ready to be sent, and the 
earliest time at which any non-sendable
-     * partition will be ready; Also return the flag for whether there are any 
unknown leaders for the accumulated
-     * partition batches.
-     * <p>
-     * A destination node is ready to send data if:
-     * <ol>
-     * <li>There is at least one partition that is not backing off its send
-     * <li><b>and</b> those partitions are not muted (to prevent reordering if
-     *   {@value 
org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
-     *   is set to one)</li>
-     * <li><b>and <i>any</i></b> of the following are true</li>
-     * <ul>
-     *     <li>The record set is full</li>
-     *     <li>The record set has sat in the accumulator for at least lingerMs 
milliseconds</li>
-     *     <li>The accumulator is out of memory and threads are blocking 
waiting for data (in this case all partitions
-     *     are immediately considered ready).</li>
-     *     <li>The accumulator has been closed</li>
-     * </ul>
-     * </ol>
+     * Add the leader to the ready nodes if the batch is ready
+     *
+     * @param nowMs The current time
+     * @param exhausted 'true' is the buffer pool is exhausted
+     * @param part The partition
+     * @param leader The leader for the partition
+     * @param waitedTimeMs How long batch waited
+     * @param backingOff Is backing off
+     * @param full Is batch full
+     * @param nextReadyCheckDelayMs The delay for next check
+     * @param readyNodes The set of ready nodes (to be filled in)
+     * @return The delay for next check
      */
-    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
-        Set<Node> readyNodes = new HashSet<>();
-        long nextReadyCheckDelayMs = Long.MAX_VALUE;
-        Set<String> unknownLeaderTopics = new HashSet<>();
+    private long batchReady(long nowMs, boolean exhausted, TopicPartition 
part, Node leader,
+                            long waitedTimeMs, boolean backingOff, boolean 
full,
+                            long nextReadyCheckDelayMs, Set<Node> readyNodes) {
+        if (!readyNodes.contains(leader) && !isMuted(part)) {
+            long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+            boolean expired = waitedTimeMs >= timeToWaitMs;
+            boolean transactionCompleting = transactionManager != null && 
transactionManager.isCompleting();
+            boolean sendable = full
+                    || expired
+                    || exhausted
+                    || closed
+                    || flushInProgress()
+                    || transactionCompleting;
+            if (sendable && !backingOff) {
+                readyNodes.add(leader);
+            } else {
+                long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
+                // Note that this results in a conservative estimate since an 
un-sendable partition may have
+                // a leader that will later be found to have sendable data. 
However, this is good enough
+                // since we'll just wake up and then sleep again for the 
remaining time.
+                nextReadyCheckDelayMs = Math.min(timeLeftMs, 
nextReadyCheckDelayMs);
+            }
+        }
+        return nextReadyCheckDelayMs;
+    }
 
+    /**
+     * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
+     * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
+     * no leader.  This function also calculates stats for adaptive 
partitioning.
+     *
+     * @param cluster The cluster metadata
+     * @param nowMs The current time
+     * @param topic The topic
+     * @param topicInfo The topic info
+     * @param nextReadyCheckDelayMs The delay for next check
+     * @param readyNodes The set of ready nodes (to be filled in)
+     * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+     * @return The delay for next check
+     */
+    private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   This is the inner loop of the `ready` function.  The addition to the 
original logic is the queue sizes calculation.  Partitions are now grouped by 
topic, so for every topic we can do queue size calculations in one go and 
update the probability weights used by adaptive partitioning logic.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -298,19 +429,20 @@ public void maybeUpdateNextBatchExpiryTime(ProducerBatch 
batch) {
      */
     public List<ProducerBatch> expiredBatches(long now) {
         List<ProducerBatch> expiredBatches = new ArrayList<>();
-        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : 
this.batches.entrySet()) {
-            // expire the batches in the order of sending
-            Deque<ProducerBatch> deque = entry.getValue();
-            synchronized (deque) {
-                while (!deque.isEmpty()) {
-                    ProducerBatch batch = deque.getFirst();
-                    if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, 
now)) {
-                        deque.poll();
-                        batch.abortRecordAppends();
-                        expiredBatches.add(batch);
-                    } else {
-                        maybeUpdateNextBatchExpiryTime(batch);
-                        break;
+        for (TopicInfo topicInfo : topicInfoMap.values()) {
+            for (Deque<ProducerBatch> deque : topicInfo.batches.values()) {

Review Comment:
   Now it's a different loop because it's map of maps, but the body is the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to