junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r861165914


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -160,91 +218,156 @@ public double measure(MetricConfig config, long now) {
         metrics.addMetric(metricName, availableBytes);
     }
 
+    private void setPartition(AppendCallbacks callbacks, int partition) {
+        if (callbacks != null)
+            callbacks.setPartition(partition);
+    }
+
     /**
      * Add a record to the accumulator, return the append result
      * <p>
      * The append result will contain the future metadata, and flag for 
whether the appended batch is full or a new batch is created
      * <p>
      *
-     * @param tp The topic/partition to which this record is being sent
+     * @param topic The topic to which this record is being sent
+     * @param partition The partition to which this record is being sent or 
RecordMetadata.UNKNOWN_PARTITION
+     *                  if any partition could be used
      * @param timestamp The timestamp of the record
      * @param key The key for the record
      * @param value The value for the record
      * @param headers the Headers for the record
-     * @param callback The user-supplied callback to execute when the request 
is complete
+     * @param callbacks The callbacks to execute
      * @param maxTimeToBlock The maximum time in milliseconds to block for 
buffer memory to be available
      * @param abortOnNewBatch A boolean that indicates returning before a new 
batch is created and
      *                        running the partitioner's onNewBatch method 
before trying to append again
      * @param nowMs The current time, in milliseconds
+     * @param cluster The cluster metadata
      */
-    public RecordAppendResult append(TopicPartition tp,
+    public RecordAppendResult append(String topic,
+                                     int partition,
                                      long timestamp,
                                      byte[] key,
                                      byte[] value,
                                      Header[] headers,
-                                     Callback callback,
+                                     AppendCallbacks callbacks,
                                      long maxTimeToBlock,
                                      boolean abortOnNewBatch,
-                                     long nowMs) throws InterruptedException {
+                                     long nowMs,
+                                     Cluster cluster) throws 
InterruptedException {
+        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new 
TopicInfo(k, batchSize));
+
         // We keep track of the number of appending thread to make sure we do 
not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
         ByteBuffer buffer = null;
         if (headers == null) headers = Record.EMPTY_HEADERS;
         try {
-            // check if we have an in-progress batch
-            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
-            synchronized (dq) {
-                if (closed)
-                    throw new KafkaException("Producer closed while send in 
progress");
-                RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq, nowMs);
-                if (appendResult != null)
-                    return appendResult;
-            }
+            // Loop to retry in case we encounter partitioner's race 
conditions.
+            while (true) {
+                // If the message doesn't have any partition affinity, so we 
pick a partition based on the broker
+                // availability and performance.  Note, that here we peek 
current partition before we hold the
+                // deque lock, so we'll need to make sure that it's not 
changed while we were waiting for the
+                // deque lock.
+                final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
+                final int effectivePartition;
+                if (partition == RecordMetadata.UNKNOWN_PARTITION) {
+                    partitionInfo = 
topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
+                    effectivePartition = partitionInfo.partition();
+                } else {
+                    partitionInfo = null;
+                    effectivePartition = partition;
+                }
 
-            // we don't have an in-progress record batch try to allocate a new 
batch
-            if (abortOnNewBatch) {
-                // Return a result that will cause another call to append.
-                return new RecordAppendResult(null, false, false, true);
-            }
+                // Now that we know the effective partition, let the caller 
know.
+                setPartition(callbacks, effectivePartition);
+
+                // check if we have an in-progress batch
+                Deque<ProducerBatch> dq = 
topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition 
hasn't changed and retry.
+                    if 
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo))
+                        continue;
+                    RecordAppendResult appendResult = tryAppend(timestamp, 
key, value, headers, callbacks, dq, nowMs);
+                    if (appendResult != null) {
+                        
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 
appendResult.appendedBytes, cluster);
+                        return appendResult;
+                    }
+                }
 
-            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
-            int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
-            log.trace("Allocating a new {} byte message buffer for topic {} 
partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), 
maxTimeToBlock);
-            buffer = free.allocate(size, maxTimeToBlock);
+                // we don't have an in-progress record batch try to allocate a 
new batch
+                if (abortOnNewBatch) {
+                    // Return a result that will cause another call to append.
+                    return new RecordAppendResult(null, false, false, true, 0);
+                }
 
-            // Update the current time in case the buffer allocation blocked 
above.
-            nowMs = time.milliseconds();
-            synchronized (dq) {
-                // Need to check if producer is closed again after grabbing 
the dequeue lock.
-                if (closed)
-                    throw new KafkaException("Producer closed while send in 
progress");
+                if (buffer == null) {
+                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+                    int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
+                    log.trace("Allocating a new {} byte message buffer for 
topic {} partition {} with remaining timeout {}ms", size, topic, partition, 
maxTimeToBlock);
+                    buffer = free.allocate(size, maxTimeToBlock);
+                }
 
-                RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq, nowMs);
-                if (appendResult != null) {
-                    // Somebody else found us a batch, return the one we 
waited for! Hopefully this doesn't happen often...
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition 
hasn't changed and retry.
+                    if 
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo))
+                        continue;
+                    RecordAppendResult appendResult = appendNewBatch(topic, 
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+                    // Don't deallocate this buffer in the finally block as 
it's being used in the record batch

Review Comment:
   This comment can be a bit confusing since we always call deallocate in the 
finally block.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -833,6 +1079,32 @@ public void close() {
         this.free.close();
     }
 
+    /**
+     * Partitioner config for built-in paritioner
+     */
+    public static final class PartitionerConfig {
+        private final boolean enableAdaptivePartitioning;
+        private final long partitionAvailabilityTimeoutMs;
+
+        /**
+         * Partitioner config
+         *
+         * @param enableAdaptivePartitioning If it's true, partition switching 
adapts to broker load, otherwise parition

Review Comment:
   typo parition



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -546,6 +554,76 @@ public void testMetadataTopicExpiry() throws Exception {
         assertTrue(future.isDone(), "Request should be completed");
     }
 
+    @Test
+    public void testNodeLatencyStats() throws Exception {
+        try (Metrics m = new Metrics()) {
+            // Create a new record accumulator with non-0 
partitionAvailabilityTimeoutMs
+            // otherwise it wouldn't update the stats.
+            RecordAccumulator.PartitionerConfig config = new 
RecordAccumulator.PartitionerConfig(false, 42);
+            long totalSize = 1024 * 1024;
+            accumulator = new RecordAccumulator(logContext, batchSize, 
CompressionType.NONE, 0, 0L,
+                DELIVERY_TIMEOUT_MS, config, m, "producer-metrics", time, 
apiVersions, null,
+                new BufferPool(totalSize, batchSize, m, time, 
"producer-internal-metrics"));
+
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, 1,
+                senderMetrics, time, REQUEST_TIMEOUT, 1000L, null, new 
ApiVersions());
+
+            // Produce and send batch.
+            long time1 = time.milliseconds();
+            appendToAccumulator(tp0, 0L, "key", "value");
+            sender.runOnce();
+            assertEquals(1, client.inFlightRequestCount(), "We should have a 
single produce request in flight.");
+
+            // We were able to send the batch out, so both the ready and drain 
values should be the same.
+            RecordAccumulator.NodeLatencyStats stats = 
accumulator.getNodeLatencyStats(0);
+            assertEquals(time1, stats.drainTimeMs);
+            assertEquals(time1, stats.readyTimeMs);
+
+            // Make client not ready.

Review Comment:
   Node 1 not ready?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -960,8 +1002,10 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, 
V> record, Callback call
                         " to class " + 
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() 
+
                         " specified in value.serializer", cce);
             }
+
+            // Try to calculate partition, but note that after this call it 
can be RecordMetadata.UNKNOWN_PARTITION,
+            // which means that the RecordAccumulator would pick a partition 
based on broker load.

Review Comment:
   The comment is not very accurate. RecordAccumulator only picks a partition 
based on broker load if enableAdaptivePartitioning is true.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that 
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for 
adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per 
topic.
+ */
+public class BuiltInPartitioner {
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = 
new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(String topic, int stickyBatchSize) {
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load 
stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's 
disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % 
availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all 
partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = 
partitionLoadStats.cumulativeFrequencyTable;
+            int weightedRandom = random % 
cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+            // By construction, the cumulative frequency table is sorted, so 
we can use binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 
0, partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or 
-(insertion_point) - 1
+            // (where insertion_point is the index of the first element 
greater than the key).
+            // We need to get the index of the first value that is strictly 
greater, which
+            // would be the insertion point, except if we found the element 
that's equal to
+            // the searched value (in this case we need to get next).  For 
example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, 
and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're 
looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            return partitionLoadStats.partitionIds[partitionIndex];
+        }
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the 
end of range for the
+     * random number.
+     */
+    public int loadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return 
partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+    }
+
+    /**
+     * Peek currently chosen sticky partition.  This method works in 
conjunction with {@link #isPartitionChanged}
+     * and {@link #updatePartitionInfo}.  The workflow is the following:
+     *
+     * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+     * 2. Lock partition's batch queue.
+     * 3. isPartitionChanged under lock to make sure that nobody raced us.
+     * 4. Append data to buffer.
+     * 5. updatePartitionInfo to update produced bytes and maybe switch 
partition.
+     *
+     *  It's important that steps 3-5 are under partition's batch queue lock.
+     *
+     * @param cluster The cluster information (needed if there is no current 
partition)
+     * @return sticky partition info object
+     */
+    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+        if (partitionInfo != null)
+            return partitionInfo;
+
+        // We're the first to create it.
+        int partition = nextPartition(cluster);
+        partitionInfo = new StickyPartitionInfo(partition);
+        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+            return partitionInfo;
+
+        // Someone has raced us.
+        return stickyPartitionInfo.get();
+    }
+
+    /**
+     * Check if partition is changed by a concurrent thread.  NOTE this 
function needs to be called under
+     * the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by 
peekCurrentPartitionInfo
+     * @return true if sticky partition object is changed (race condition)
+     */
+    boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+        return partitionInfo != null && stickyPartitionInfo.get() != 
partitionInfo;

Review Comment:
   Should we assert that partitionInfo is not null?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -160,91 +218,156 @@ public double measure(MetricConfig config, long now) {
         metrics.addMetric(metricName, availableBytes);
     }
 
+    private void setPartition(AppendCallbacks callbacks, int partition) {
+        if (callbacks != null)
+            callbacks.setPartition(partition);
+    }
+
     /**
      * Add a record to the accumulator, return the append result
      * <p>
      * The append result will contain the future metadata, and flag for 
whether the appended batch is full or a new batch is created
      * <p>
      *
-     * @param tp The topic/partition to which this record is being sent
+     * @param topic The topic to which this record is being sent
+     * @param partition The partition to which this record is being sent or 
RecordMetadata.UNKNOWN_PARTITION
+     *                  if any partition could be used
      * @param timestamp The timestamp of the record
      * @param key The key for the record
      * @param value The value for the record
      * @param headers the Headers for the record
-     * @param callback The user-supplied callback to execute when the request 
is complete
+     * @param callbacks The callbacks to execute
      * @param maxTimeToBlock The maximum time in milliseconds to block for 
buffer memory to be available
      * @param abortOnNewBatch A boolean that indicates returning before a new 
batch is created and
      *                        running the partitioner's onNewBatch method 
before trying to append again
      * @param nowMs The current time, in milliseconds
+     * @param cluster The cluster metadata
      */
-    public RecordAppendResult append(TopicPartition tp,
+    public RecordAppendResult append(String topic,
+                                     int partition,
                                      long timestamp,
                                      byte[] key,
                                      byte[] value,
                                      Header[] headers,
-                                     Callback callback,
+                                     AppendCallbacks callbacks,
                                      long maxTimeToBlock,
                                      boolean abortOnNewBatch,
-                                     long nowMs) throws InterruptedException {
+                                     long nowMs,
+                                     Cluster cluster) throws 
InterruptedException {
+        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new 
TopicInfo(k, batchSize));
+
         // We keep track of the number of appending thread to make sure we do 
not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
         ByteBuffer buffer = null;
         if (headers == null) headers = Record.EMPTY_HEADERS;
         try {
-            // check if we have an in-progress batch
-            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
-            synchronized (dq) {
-                if (closed)
-                    throw new KafkaException("Producer closed while send in 
progress");
-                RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq, nowMs);
-                if (appendResult != null)
-                    return appendResult;
-            }
+            // Loop to retry in case we encounter partitioner's race 
conditions.
+            while (true) {
+                // If the message doesn't have any partition affinity, so we 
pick a partition based on the broker
+                // availability and performance.  Note, that here we peek 
current partition before we hold the
+                // deque lock, so we'll need to make sure that it's not 
changed while we were waiting for the
+                // deque lock.
+                final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
+                final int effectivePartition;
+                if (partition == RecordMetadata.UNKNOWN_PARTITION) {
+                    partitionInfo = 
topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
+                    effectivePartition = partitionInfo.partition();
+                } else {
+                    partitionInfo = null;
+                    effectivePartition = partition;
+                }
 
-            // we don't have an in-progress record batch try to allocate a new 
batch
-            if (abortOnNewBatch) {
-                // Return a result that will cause another call to append.
-                return new RecordAppendResult(null, false, false, true);
-            }
+                // Now that we know the effective partition, let the caller 
know.
+                setPartition(callbacks, effectivePartition);
+
+                // check if we have an in-progress batch
+                Deque<ProducerBatch> dq = 
topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition 
hasn't changed and retry.
+                    if 
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo))
+                        continue;
+                    RecordAppendResult appendResult = tryAppend(timestamp, 
key, value, headers, callbacks, dq, nowMs);
+                    if (appendResult != null) {
+                        
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 
appendResult.appendedBytes, cluster);
+                        return appendResult;
+                    }
+                }
 
-            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
-            int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
-            log.trace("Allocating a new {} byte message buffer for topic {} 
partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), 
maxTimeToBlock);
-            buffer = free.allocate(size, maxTimeToBlock);
+                // we don't have an in-progress record batch try to allocate a 
new batch
+                if (abortOnNewBatch) {
+                    // Return a result that will cause another call to append.
+                    return new RecordAppendResult(null, false, false, true, 0);
+                }
 
-            // Update the current time in case the buffer allocation blocked 
above.
-            nowMs = time.milliseconds();
-            synchronized (dq) {
-                // Need to check if producer is closed again after grabbing 
the dequeue lock.
-                if (closed)
-                    throw new KafkaException("Producer closed while send in 
progress");
+                if (buffer == null) {
+                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+                    int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
+                    log.trace("Allocating a new {} byte message buffer for 
topic {} partition {} with remaining timeout {}ms", size, topic, partition, 
maxTimeToBlock);
+                    buffer = free.allocate(size, maxTimeToBlock);
+                }
 
-                RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq, nowMs);
-                if (appendResult != null) {
-                    // Somebody else found us a batch, return the one we 
waited for! Hopefully this doesn't happen often...
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition 
hasn't changed and retry.
+                    if 
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo))
+                        continue;
+                    RecordAppendResult appendResult = appendNewBatch(topic, 
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+                    // Don't deallocate this buffer in the finally block as 
it's being used in the record batch
+                    if (appendResult.newBatchCreated)
+                        buffer = null;
+                    
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 
appendResult.appendedBytes, cluster);
                     return appendResult;
                 }
-
-                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, 
maxUsableMagic);
-                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, 
nowMs);
-                FutureRecordMetadata future = 
Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
-                        callback, nowMs));
-
-                dq.addLast(batch);
-                incomplete.add(batch);
-
-                // Don't deallocate this buffer in the finally block as it's 
being used in the record batch
-                buffer = null;
-                return new RecordAppendResult(future, dq.size() > 1 || 
batch.isFull(), true, false);
             }
         } finally {
-            if (buffer != null)
-                free.deallocate(buffer);
+            free.deallocate(buffer);
             appendsInProgress.decrementAndGet();
         }
     }
 
+    /**
+     * Append a new batch to the queue
+     *
+     * @param topic The topic
+     * @param partition The partition (cannot be 
RecordMetadata.UNKNOWN_PARTITION)
+     * @param dq The queue
+     * @param timestamp The timestamp of the record
+     * @param key The key for the record
+     * @param value The value for the record
+     * @param headers the Headers for the record
+     * @param callbacks The callbacks to execute
+     * @param buffer The buffer for the new batch
+     */
+    private RecordAppendResult appendNewBatch(String topic,
+                                              int partition,
+                                              Deque<ProducerBatch> dq,
+                                              long timestamp,
+                                              byte[] key,
+                                              byte[] value,
+                                              Header[] headers,
+                                              AppendCallbacks callbacks,
+                                              ByteBuffer buffer) throws 
InterruptedException {
+        assert partition != RecordMetadata.UNKNOWN_PARTITION;
+
+        // Update the current time in case the buffer allocation blocked above.

Review Comment:
   We used to check if the producer is closed here. Is that still needed?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##########
@@ -25,12 +23,15 @@
 
     private final Cluster cluster;
     private final Serializer<K> keySerializer;
-    private final DefaultPartitioner defaultPartitioner;
 
+    @SuppressWarnings("deprecation")
+    private final 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
defaultPartitioner;
+
+    @SuppressWarnings("deprecation")
     public DefaultStreamPartitioner(final Serializer<K> keySerializer, final 
Cluster cluster) {
         this.cluster = cluster;
         this.keySerializer = keySerializer;
-        this.defaultPartitioner = new DefaultPartitioner();
+        this.defaultPartitioner = new 
org.apache.kafka.clients.producer.internals.DefaultPartitioner();

Review Comment:
   I am wondering if the DefaultStreamPartitioner should be the built-in 
partition. 
   
   @guozhangwang : What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -833,6 +1079,32 @@ public void close() {
         this.free.close();
     }
 
+    /**
+     * Partitioner config for built-in paritioner

Review Comment:
   typo paritioner



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -833,6 +1079,32 @@ public void close() {
         this.free.close();
     }
 
+    /**
+     * Partitioner config for built-in paritioner
+     */
+    public static final class PartitionerConfig {
+        private final boolean enableAdaptivePartitioning;
+        private final long partitionAvailabilityTimeoutMs;
+
+        /**
+         * Partitioner config
+         *
+         * @param enableAdaptivePartitioning If it's true, partition switching 
adapts to broker load, otherwise parition
+         *        swiching is random.

Review Comment:
   typo swiching



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -160,91 +218,156 @@ public double measure(MetricConfig config, long now) {
         metrics.addMetric(metricName, availableBytes);
     }
 
+    private void setPartition(AppendCallbacks callbacks, int partition) {
+        if (callbacks != null)
+            callbacks.setPartition(partition);
+    }
+
     /**
      * Add a record to the accumulator, return the append result
      * <p>
      * The append result will contain the future metadata, and flag for 
whether the appended batch is full or a new batch is created
      * <p>
      *
-     * @param tp The topic/partition to which this record is being sent
+     * @param topic The topic to which this record is being sent
+     * @param partition The partition to which this record is being sent or 
RecordMetadata.UNKNOWN_PARTITION
+     *                  if any partition could be used
      * @param timestamp The timestamp of the record
      * @param key The key for the record
      * @param value The value for the record
      * @param headers the Headers for the record
-     * @param callback The user-supplied callback to execute when the request 
is complete
+     * @param callbacks The callbacks to execute
      * @param maxTimeToBlock The maximum time in milliseconds to block for 
buffer memory to be available
      * @param abortOnNewBatch A boolean that indicates returning before a new 
batch is created and
      *                        running the partitioner's onNewBatch method 
before trying to append again
      * @param nowMs The current time, in milliseconds
+     * @param cluster The cluster metadata
      */
-    public RecordAppendResult append(TopicPartition tp,
+    public RecordAppendResult append(String topic,
+                                     int partition,
                                      long timestamp,
                                      byte[] key,
                                      byte[] value,
                                      Header[] headers,
-                                     Callback callback,
+                                     AppendCallbacks callbacks,
                                      long maxTimeToBlock,
                                      boolean abortOnNewBatch,
-                                     long nowMs) throws InterruptedException {
+                                     long nowMs,
+                                     Cluster cluster) throws 
InterruptedException {
+        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new 
TopicInfo(k, batchSize));
+
         // We keep track of the number of appending thread to make sure we do 
not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
         ByteBuffer buffer = null;
         if (headers == null) headers = Record.EMPTY_HEADERS;
         try {
-            // check if we have an in-progress batch
-            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
-            synchronized (dq) {
-                if (closed)
-                    throw new KafkaException("Producer closed while send in 
progress");
-                RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq, nowMs);
-                if (appendResult != null)
-                    return appendResult;
-            }
+            // Loop to retry in case we encounter partitioner's race 
conditions.
+            while (true) {
+                // If the message doesn't have any partition affinity, so we 
pick a partition based on the broker
+                // availability and performance.  Note, that here we peek 
current partition before we hold the
+                // deque lock, so we'll need to make sure that it's not 
changed while we were waiting for the
+                // deque lock.
+                final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
+                final int effectivePartition;
+                if (partition == RecordMetadata.UNKNOWN_PARTITION) {
+                    partitionInfo = 
topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
+                    effectivePartition = partitionInfo.partition();
+                } else {
+                    partitionInfo = null;
+                    effectivePartition = partition;
+                }
 
-            // we don't have an in-progress record batch try to allocate a new 
batch
-            if (abortOnNewBatch) {
-                // Return a result that will cause another call to append.
-                return new RecordAppendResult(null, false, false, true);
-            }
+                // Now that we know the effective partition, let the caller 
know.
+                setPartition(callbacks, effectivePartition);
+
+                // check if we have an in-progress batch
+                Deque<ProducerBatch> dq = 
topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition 
hasn't changed and retry.
+                    if 
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo))
+                        continue;
+                    RecordAppendResult appendResult = tryAppend(timestamp, 
key, value, headers, callbacks, dq, nowMs);
+                    if (appendResult != null) {
+                        
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 
appendResult.appendedBytes, cluster);
+                        return appendResult;
+                    }
+                }
 
-            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
-            int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
-            log.trace("Allocating a new {} byte message buffer for topic {} 
partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), 
maxTimeToBlock);
-            buffer = free.allocate(size, maxTimeToBlock);
+                // we don't have an in-progress record batch try to allocate a 
new batch
+                if (abortOnNewBatch) {
+                    // Return a result that will cause another call to append.
+                    return new RecordAppendResult(null, false, false, true, 0);
+                }
 
-            // Update the current time in case the buffer allocation blocked 
above.
-            nowMs = time.milliseconds();
-            synchronized (dq) {
-                // Need to check if producer is closed again after grabbing 
the dequeue lock.
-                if (closed)
-                    throw new KafkaException("Producer closed while send in 
progress");
+                if (buffer == null) {
+                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+                    int size = Math.max(this.batchSize, 
AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, 
value, headers));
+                    log.trace("Allocating a new {} byte message buffer for 
topic {} partition {} with remaining timeout {}ms", size, topic, partition, 
maxTimeToBlock);
+                    buffer = free.allocate(size, maxTimeToBlock);
+                }
 
-                RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq, nowMs);
-                if (appendResult != null) {
-                    // Somebody else found us a batch, return the one we 
waited for! Hopefully this doesn't happen often...
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition 
hasn't changed and retry.
+                    if 
(topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo))
+                        continue;
+                    RecordAppendResult appendResult = appendNewBatch(topic, 
effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+                    // Don't deallocate this buffer in the finally block as 
it's being used in the record batch
+                    if (appendResult.newBatchCreated)
+                        buffer = null;
+                    
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 
appendResult.appendedBytes, cluster);
                     return appendResult;
                 }
-
-                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, 
maxUsableMagic);
-                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, 
nowMs);
-                FutureRecordMetadata future = 
Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
-                        callback, nowMs));
-
-                dq.addLast(batch);
-                incomplete.add(batch);
-
-                // Don't deallocate this buffer in the finally block as it's 
being used in the record batch
-                buffer = null;
-                return new RecordAppendResult(future, dq.size() > 1 || 
batch.isFull(), true, false);
             }
         } finally {
-            if (buffer != null)
-                free.deallocate(buffer);
+            free.deallocate(buffer);
             appendsInProgress.decrementAndGet();
         }
     }
 
+    /**
+     * Append a new batch to the queue
+     *
+     * @param topic The topic
+     * @param partition The partition (cannot be 
RecordMetadata.UNKNOWN_PARTITION)
+     * @param dq The queue
+     * @param timestamp The timestamp of the record
+     * @param key The key for the record
+     * @param value The value for the record
+     * @param headers the Headers for the record
+     * @param callbacks The callbacks to execute
+     * @param buffer The buffer for the new batch
+     */
+    private RecordAppendResult appendNewBatch(String topic,
+                                              int partition,
+                                              Deque<ProducerBatch> dq,
+                                              long timestamp,
+                                              byte[] key,
+                                              byte[] value,
+                                              Header[] headers,
+                                              AppendCallbacks callbacks,
+                                              ByteBuffer buffer) throws 
InterruptedException {

Review Comment:
   It seems this method doesn't throw InterruptedException?



##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor;
 
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;

Review Comment:
   There is also a reference to DefaultPartitioner in the javadoc below.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1017,6 +1024,158 @@ public void testStickyBatches() throws Exception {
         assertEquals(appends, 2 * expectedAppends);
     }
 
+    @Test
+    public void testUniformBuiltInPartitioner() throws Exception {
+
+        try {
+            // Mock random number generator with just sequential integer.
+            AtomicInteger mockRandom = new AtomicInteger();
+            BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
+
+            long totalSize = 1024 * 1024;
+            int batchSize = 128;  // note that this is also a "sticky" limit 
for the partitioner
+            RecordAccumulator accum = createTestRecordAccumulator(batchSize, 
totalSize, CompressionType.NONE, 0);
+
+            // Set up callbacks so that we know what partition is chosen.
+            final int[] partition = {RecordMetadata.UNKNOWN_PARTITION};

Review Comment:
   Do we need an array since we only return one partition?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.Record;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BuiltInPartitionerTest {
+    private final static Node[] NODES = new Node[] {
+        new Node(0, "localhost", 99),
+        new Node(1, "localhost", 100),
+        new Node(2, "localhost", 101),
+        new Node(11, "localhost", 102)
+    };
+    final static String TOPIC_A = "topicA";
+    final static String TOPIC_B = "topicB";
+    final static String TOPIC_C = "topicC";
+    final static Header[] EMPTY_HEADERS = Record.EMPTY_HEADERS;

Review Comment:
   EMPTY_HEADERS is no longer referenced.



##########
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java:
##########
@@ -117,10 +116,24 @@ public MockProducer(final Cluster cluster,
      *
      * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, 
Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new 
DefaultPartitioner(), keySerializer, valueSerializer)}
      */
+    @SuppressWarnings("deprecation")
     public MockProducer(final boolean autoComplete,
                         final Serializer<K> keySerializer,
                         final Serializer<V> valueSerializer) {
-        this(Cluster.empty(), autoComplete, new DefaultPartitioner(), 
keySerializer, valueSerializer);
+        this(Cluster.empty(), autoComplete, new 
org.apache.kafka.clients.producer.internals.DefaultPartitioner(), 
keySerializer, valueSerializer);

Review Comment:
   Should the partitioner be the built-in one now?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that 
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for 
adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per 
topic.
+ */
+public class BuiltInPartitioner {
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = 
new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(String topic, int stickyBatchSize) {
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load 
stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's 
disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % 
availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all 
partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = 
partitionLoadStats.cumulativeFrequencyTable;
+            int weightedRandom = random % 
cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+            // By construction, the cumulative frequency table is sorted, so 
we can use binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 
0, partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or 
-(insertion_point) - 1
+            // (where insertion_point is the index of the first element 
greater than the key).
+            // We need to get the index of the first value that is strictly 
greater, which
+            // would be the insertion point, except if we found the element 
that's equal to
+            // the searched value (in this case we need to get next).  For 
example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, 
and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're 
looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            return partitionLoadStats.partitionIds[partitionIndex];
+        }
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the 
end of range for the
+     * random number.
+     */
+    public int loadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return 
partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+    }
+
+    /**
+     * Peek currently chosen sticky partition.  This method works in 
conjunction with {@link #isPartitionChanged}
+     * and {@link #updatePartitionInfo}.  The workflow is the following:
+     *
+     * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+     * 2. Lock partition's batch queue.
+     * 3. isPartitionChanged under lock to make sure that nobody raced us.
+     * 4. Append data to buffer.
+     * 5. updatePartitionInfo to update produced bytes and maybe switch 
partition.
+     *
+     *  It's important that steps 3-5 are under partition's batch queue lock.
+     *
+     * @param cluster The cluster information (needed if there is no current 
partition)
+     * @return sticky partition info object
+     */
+    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+        if (partitionInfo != null)
+            return partitionInfo;
+
+        // We're the first to create it.
+        int partition = nextPartition(cluster);
+        partitionInfo = new StickyPartitionInfo(partition);
+        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+            return partitionInfo;
+
+        // Someone has raced us.
+        return stickyPartitionInfo.get();
+    }
+
+    /**
+     * Check if partition is changed by a concurrent thread.  NOTE this 
function needs to be called under
+     * the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by 
peekCurrentPartitionInfo
+     * @return true if sticky partition object is changed (race condition)
+     */
+    boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+        return partitionInfo != null && stickyPartitionInfo.get() != 
partitionInfo;
+    }
+
+    /**
+     * Update partition info with the number of bytes appended and maybe 
switch partition.
+     * NOTE this function needs to be called under the partition's batch queue 
lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by 
peekCurrentPartitionInfo
+     * @param appendedBytes The number of bytes appended to this partition
+     * @param cluster The cluster information
+     */
+    void updatePartitionInfo(StickyPartitionInfo partitionInfo, int 
appendedBytes, Cluster cluster) {
+        if (partitionInfo == null)

Review Comment:
   Should we assert that partitionInfo is not null?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that 
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for 
adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per 
topic.
+ */
+public class BuiltInPartitioner {
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = 
new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(String topic, int stickyBatchSize) {
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load 
stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's 
disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % 
availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all 
partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = 
partitionLoadStats.cumulativeFrequencyTable;

Review Comment:
   In the adaptive case, it seems that we should also avoid selecting 
partitions with no leader, if possible?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -467,22 +466,31 @@ public void testAppendInExpiryCallback() throws 
InterruptedException {
         final byte[] key = "key".getBytes();
         final byte[] value = "value".getBytes();
         final long maxBlockTimeMs = 1000;
-        Callback callback = (metadata, exception) -> {
-            if (exception instanceof TimeoutException) {
-                expiryCallbackCount.incrementAndGet();
-                try {
-                    accumulator.append(tp1, 0L, key, value,
-                        Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, 
time.milliseconds());
-                } catch (InterruptedException e) {
-                    throw new RuntimeException("Unexpected interruption", e);
-                }
-            } else if (exception != null)
-                unexpectedException.compareAndSet(null, exception);
+        Cluster cluster = TestUtils.singletonCluster();
+        RecordAccumulator.AppendCallbacks callbacks = new 
RecordAccumulator.AppendCallbacks() {
+            @Override
+            public void setPartition(int partition) {
+            }
+
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
+                if (exception instanceof TimeoutException) {
+                    expiryCallbackCount.incrementAndGet();
+                    try {
+                        accumulator.append(tp1.topic(), tp1.partition(), 0L, 
key, value,
+                            Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, 
time.milliseconds(), cluster);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Unexpected interruption", 
e);
+                    }
+                } else if (exception != null)
+                    unexpectedException.compareAndSet(null, exception);
+

Review Comment:
   extra empty line.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that 
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for 
adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per 
topic.
+ */
+public class BuiltInPartitioner {
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = 
new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(String topic, int stickyBatchSize) {
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load 
stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's 
disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % 
availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all 
partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = 
partitionLoadStats.cumulativeFrequencyTable;
+            int weightedRandom = random % 
cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+            // By construction, the cumulative frequency table is sorted, so 
we can use binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 
0, partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or 
-(insertion_point) - 1
+            // (where insertion_point is the index of the first element 
greater than the key).
+            // We need to get the index of the first value that is strictly 
greater, which
+            // would be the insertion point, except if we found the element 
that's equal to
+            // the searched value (in this case we need to get next).  For 
example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, 
and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're 
looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            return partitionLoadStats.partitionIds[partitionIndex];
+        }
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the 
end of range for the
+     * random number.
+     */
+    public int loadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return 
partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+    }
+
+    /**
+     * Peek currently chosen sticky partition.  This method works in 
conjunction with {@link #isPartitionChanged}
+     * and {@link #updatePartitionInfo}.  The workflow is the following:
+     *
+     * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+     * 2. Lock partition's batch queue.
+     * 3. isPartitionChanged under lock to make sure that nobody raced us.
+     * 4. Append data to buffer.
+     * 5. updatePartitionInfo to update produced bytes and maybe switch 
partition.
+     *
+     *  It's important that steps 3-5 are under partition's batch queue lock.
+     *
+     * @param cluster The cluster information (needed if there is no current 
partition)
+     * @return sticky partition info object
+     */
+    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+        if (partitionInfo != null)
+            return partitionInfo;
+
+        // We're the first to create it.
+        int partition = nextPartition(cluster);
+        partitionInfo = new StickyPartitionInfo(partition);
+        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+            return partitionInfo;
+
+        // Someone has raced us.
+        return stickyPartitionInfo.get();
+    }
+
+    /**
+     * Check if partition is changed by a concurrent thread.  NOTE this 
function needs to be called under
+     * the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by 
peekCurrentPartitionInfo
+     * @return true if sticky partition object is changed (race condition)
+     */
+    boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+        return partitionInfo != null && stickyPartitionInfo.get() != 
partitionInfo;
+    }
+
+    /**
+     * Update partition info with the number of bytes appended and maybe 
switch partition.
+     * NOTE this function needs to be called under the partition's batch queue 
lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by 
peekCurrentPartitionInfo
+     * @param appendedBytes The number of bytes appended to this partition
+     * @param cluster The cluster information
+     */
+    void updatePartitionInfo(StickyPartitionInfo partitionInfo, int 
appendedBytes, Cluster cluster) {
+        if (partitionInfo == null)
+            return;
+
+        assert partitionInfo == stickyPartitionInfo.get();
+        int producedBytes = 
partitionInfo.producedBytes.addAndGet(appendedBytes);
+        if (producedBytes >= stickyBatchSize) {
+            // We've produced enough to this partition, switch to next.
+            int partition = nextPartition(cluster);
+            StickyPartitionInfo newPartitionInfo = new 
StickyPartitionInfo(partition);
+            stickyPartitionInfo.set(newPartitionInfo);
+        }
+    }
+
+    /**
+     * Update partition load stats from the queue sizes of each partition.
+     * NOTE: queueSizes are modified in place to avoid allocations
+     *
+     * @param queueSizes The queue sizes
+     * @param partitionIds The partition ids for the queues
+     * @param length The logical length of the arrays (could be less): we may 
eliminate some partitions
+     *               based on latency, but to avoid reallocation of the 
arrays, we just decrement
+     *               logical length
+     * Visible for testing
+     */
+    public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, 
int length) {
+        if (queueSizes == null) {
+            partitionLoadStats = null;
+            return;
+        }
+        assert queueSizes.length == partitionIds.length;
+        assert length <= queueSizes.length;
+
+        // The queueSizes.length represents the number of all partitions in 
the topic and if we have
+        // less than 2 partitions, there is no need to do adaptive logic.
+        // If partitioner.availability.timeout.ms != 0, then partitions that 
experience high latencies
+        // (greater than partitioner.availability.timeout.ms) may be excluded, 
the length represents
+        // partitions that are not excluded.  If some partitions were 
excluded, we'd still want to
+        // go through adaptive logic, even if we have one partition.
+        // See also RecordAccumulator#partitionReady where the queueSizes are 
built.
+        if (length < 1 || queueSizes.length < 2) {

Review Comment:
   Would it be useful to add a debug level logging if length is less than 
queueSizes.length?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to