junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r861165914
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -160,91 +218,156 @@ public double measure(MetricConfig config, long now) { metrics.addMetric(metricName, availableBytes); } + private void setPartition(AppendCallbacks callbacks, int partition) { + if (callbacks != null) + callbacks.setPartition(partition); + } + /** * Add a record to the accumulator, return the append result * <p> * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created * <p> * - * @param tp The topic/partition to which this record is being sent + * @param topic The topic to which this record is being sent + * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION + * if any partition could be used * @param timestamp The timestamp of the record * @param key The key for the record * @param value The value for the record * @param headers the Headers for the record - * @param callback The user-supplied callback to execute when the request is complete + * @param callbacks The callbacks to execute * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and * running the partitioner's onNewBatch method before trying to append again * @param nowMs The current time, in milliseconds + * @param cluster The cluster metadata */ - public RecordAppendResult append(TopicPartition tp, + public RecordAppendResult append(String topic, + int partition, long timestamp, byte[] key, byte[] value, Header[] headers, - Callback callback, + AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, - long nowMs) throws InterruptedException { + long nowMs, + Cluster cluster) throws InterruptedException { + TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(k, batchSize)); + // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { - // check if we have an in-progress batch - Deque<ProducerBatch> dq = getOrCreateDeque(tp); - synchronized (dq) { - if (closed) - throw new KafkaException("Producer closed while send in progress"); - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) - return appendResult; - } + // Loop to retry in case we encounter partitioner's race conditions. + while (true) { + // If the message doesn't have any partition affinity, so we pick a partition based on the broker + // availability and performance. Note, that here we peek current partition before we hold the + // deque lock, so we'll need to make sure that it's not changed while we were waiting for the + // deque lock. + final BuiltInPartitioner.StickyPartitionInfo partitionInfo; + final int effectivePartition; + if (partition == RecordMetadata.UNKNOWN_PARTITION) { + partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster); + effectivePartition = partitionInfo.partition(); + } else { + partitionInfo = null; + effectivePartition = partition; + } - // we don't have an in-progress record batch try to allocate a new batch - if (abortOnNewBatch) { - // Return a result that will cause another call to append. - return new RecordAppendResult(null, false, false, true); - } + // Now that we know the effective partition, let the caller know. + setPartition(callbacks, effectivePartition); + + // check if we have an in-progress batch + Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>()); + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) + continue; + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); + if (appendResult != null) { + topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); + return appendResult; + } + } - byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); - log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock); - buffer = free.allocate(size, maxTimeToBlock); + // we don't have an in-progress record batch try to allocate a new batch + if (abortOnNewBatch) { + // Return a result that will cause another call to append. + return new RecordAppendResult(null, false, false, true, 0); + } - // Update the current time in case the buffer allocation blocked above. - nowMs = time.milliseconds(); - synchronized (dq) { - // Need to check if producer is closed again after grabbing the dequeue lock. - if (closed) - throw new KafkaException("Producer closed while send in progress"); + if (buffer == null) { + byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); + log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + buffer = free.allocate(size, maxTimeToBlock); + } - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) + continue; + RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer); + // Don't deallocate this buffer in the finally block as it's being used in the record batch Review Comment: This comment can be a bit confusing since we always call deallocate in the finally block. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -833,6 +1079,32 @@ public void close() { this.free.close(); } + /** + * Partitioner config for built-in paritioner + */ + public static final class PartitionerConfig { + private final boolean enableAdaptivePartitioning; + private final long partitionAvailabilityTimeoutMs; + + /** + * Partitioner config + * + * @param enableAdaptivePartitioning If it's true, partition switching adapts to broker load, otherwise parition Review Comment: typo parition ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -546,6 +554,76 @@ public void testMetadataTopicExpiry() throws Exception { assertTrue(future.isDone(), "Request should be completed"); } + @Test + public void testNodeLatencyStats() throws Exception { + try (Metrics m = new Metrics()) { + // Create a new record accumulator with non-0 partitionAvailabilityTimeoutMs + // otherwise it wouldn't update the stats. + RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(false, 42); + long totalSize = 1024 * 1024; + accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L, + DELIVERY_TIMEOUT_MS, config, m, "producer-metrics", time, apiVersions, null, + new BufferPool(totalSize, batchSize, m, time, "producer-internal-metrics")); + + SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, 1, + senderMetrics, time, REQUEST_TIMEOUT, 1000L, null, new ApiVersions()); + + // Produce and send batch. + long time1 = time.milliseconds(); + appendToAccumulator(tp0, 0L, "key", "value"); + sender.runOnce(); + assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight."); + + // We were able to send the batch out, so both the ready and drain values should be the same. + RecordAccumulator.NodeLatencyStats stats = accumulator.getNodeLatencyStats(0); + assertEquals(time1, stats.drainTimeMs); + assertEquals(time1, stats.readyTimeMs); + + // Make client not ready. Review Comment: Node 1 not ready? ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -960,8 +1002,10 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } + + // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION, + // which means that the RecordAccumulator would pick a partition based on broker load. Review Comment: The comment is not very accurate. RecordAccumulator only picks a partition based on broker load if enableAdaptivePartitioning is true. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Built-in default partitioner. Note, that this is just a utility class that is used directly from + * RecordAccumulator, it does not implement the Partitioner interface. + * + * The class keeps track of various bookkeeping information required for adaptive sticky partitioning + * (described in detail in KIP-794). There is one partitioner object per topic. + */ +public class BuiltInPartitioner { + private final String topic; + private final int stickyBatchSize; + + private volatile PartitionLoadStats partitionLoadStats = null; + private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>(); + + // Visible and used for testing only. + static volatile public Supplier<Integer> mockRandom = null; + + /** + * BuiltInPartitioner constructor. + * + * @param topic The topic + * @param stickyBatchSize How much to produce to partition before switch + */ + public BuiltInPartitioner(String topic, int stickyBatchSize) { + this.topic = topic; + this.stickyBatchSize = stickyBatchSize; + } + + /** + * Calculate the next partition for the topic based on the partition load stats. + */ + private int nextPartition(Cluster cluster) { + int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); + + // Cache volatile variable in local variable. + PartitionLoadStats partitionLoadStats = this.partitionLoadStats; + + if (partitionLoadStats == null) { + // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next + // partition based on uniform distribution. + List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) + return availablePartitions.get(random % availablePartitions.size()).partition(); + + // We don't have available partitions, just pick one among all partitions. + List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); + return random % partitions.size(); + } else { + // Calculate next partition based on load distribution. + assert partitionLoadStats.length > 0; + + int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable; + int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1]; + + // By construction, the cumulative frequency table is sorted, so we can use binary + // search to find the desired index. + int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom); + + // binarySearch results the index of the found element, or -(insertion_point) - 1 + // (where insertion_point is the index of the first element greater than the key). + // We need to get the index of the first value that is strictly greater, which + // would be the insertion point, except if we found the element that's equal to + // the searched value (in this case we need to get next). For example, if we have + // 4 5 8 + // and we're looking for 3, then we'd get the insertion_point = 0, and the function + // would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd + // get 0, and we need the next one, so adding 1 works here as well. + int partitionIndex = Math.abs(searchResult + 1); + assert partitionIndex < partitionLoadStats.length; + return partitionLoadStats.partitionIds[partitionIndex]; + } + } + + /** + * Test-only function. When partition load stats are defined, return the end of range for the + * random number. + */ + public int loadStatsRangeEnd() { + assert partitionLoadStats != null; + assert partitionLoadStats.length > 0; + return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1]; + } + + /** + * Peek currently chosen sticky partition. This method works in conjunction with {@link #isPartitionChanged} + * and {@link #updatePartitionInfo}. The workflow is the following: + * + * 1. peekCurrentPartitionInfo is called to know which partition to lock. + * 2. Lock partition's batch queue. + * 3. isPartitionChanged under lock to make sure that nobody raced us. + * 4. Append data to buffer. + * 5. updatePartitionInfo to update produced bytes and maybe switch partition. + * + * It's important that steps 3-5 are under partition's batch queue lock. + * + * @param cluster The cluster information (needed if there is no current partition) + * @return sticky partition info object + */ + StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) { + StickyPartitionInfo partitionInfo = stickyPartitionInfo.get(); + if (partitionInfo != null) + return partitionInfo; + + // We're the first to create it. + int partition = nextPartition(cluster); + partitionInfo = new StickyPartitionInfo(partition); + if (stickyPartitionInfo.compareAndSet(null, partitionInfo)) + return partitionInfo; + + // Someone has raced us. + return stickyPartitionInfo.get(); + } + + /** + * Check if partition is changed by a concurrent thread. NOTE this function needs to be called under + * the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @return true if sticky partition object is changed (race condition) + */ + boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { + return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo; Review Comment: Should we assert that partitionInfo is not null? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -160,91 +218,156 @@ public double measure(MetricConfig config, long now) { metrics.addMetric(metricName, availableBytes); } + private void setPartition(AppendCallbacks callbacks, int partition) { + if (callbacks != null) + callbacks.setPartition(partition); + } + /** * Add a record to the accumulator, return the append result * <p> * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created * <p> * - * @param tp The topic/partition to which this record is being sent + * @param topic The topic to which this record is being sent + * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION + * if any partition could be used * @param timestamp The timestamp of the record * @param key The key for the record * @param value The value for the record * @param headers the Headers for the record - * @param callback The user-supplied callback to execute when the request is complete + * @param callbacks The callbacks to execute * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and * running the partitioner's onNewBatch method before trying to append again * @param nowMs The current time, in milliseconds + * @param cluster The cluster metadata */ - public RecordAppendResult append(TopicPartition tp, + public RecordAppendResult append(String topic, + int partition, long timestamp, byte[] key, byte[] value, Header[] headers, - Callback callback, + AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, - long nowMs) throws InterruptedException { + long nowMs, + Cluster cluster) throws InterruptedException { + TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(k, batchSize)); + // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { - // check if we have an in-progress batch - Deque<ProducerBatch> dq = getOrCreateDeque(tp); - synchronized (dq) { - if (closed) - throw new KafkaException("Producer closed while send in progress"); - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) - return appendResult; - } + // Loop to retry in case we encounter partitioner's race conditions. + while (true) { + // If the message doesn't have any partition affinity, so we pick a partition based on the broker + // availability and performance. Note, that here we peek current partition before we hold the + // deque lock, so we'll need to make sure that it's not changed while we were waiting for the + // deque lock. + final BuiltInPartitioner.StickyPartitionInfo partitionInfo; + final int effectivePartition; + if (partition == RecordMetadata.UNKNOWN_PARTITION) { + partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster); + effectivePartition = partitionInfo.partition(); + } else { + partitionInfo = null; + effectivePartition = partition; + } - // we don't have an in-progress record batch try to allocate a new batch - if (abortOnNewBatch) { - // Return a result that will cause another call to append. - return new RecordAppendResult(null, false, false, true); - } + // Now that we know the effective partition, let the caller know. + setPartition(callbacks, effectivePartition); + + // check if we have an in-progress batch + Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>()); + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) + continue; + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); + if (appendResult != null) { + topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); + return appendResult; + } + } - byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); - log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock); - buffer = free.allocate(size, maxTimeToBlock); + // we don't have an in-progress record batch try to allocate a new batch + if (abortOnNewBatch) { + // Return a result that will cause another call to append. + return new RecordAppendResult(null, false, false, true, 0); + } - // Update the current time in case the buffer allocation blocked above. - nowMs = time.milliseconds(); - synchronized (dq) { - // Need to check if producer is closed again after grabbing the dequeue lock. - if (closed) - throw new KafkaException("Producer closed while send in progress"); + if (buffer == null) { + byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); + log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + buffer = free.allocate(size, maxTimeToBlock); + } - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) + continue; + RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer); + // Don't deallocate this buffer in the finally block as it's being used in the record batch + if (appendResult.newBatchCreated) + buffer = null; + topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); return appendResult; } - - MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); - ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs); - FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, - callback, nowMs)); - - dq.addLast(batch); - incomplete.add(batch); - - // Don't deallocate this buffer in the finally block as it's being used in the record batch - buffer = null; - return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { - if (buffer != null) - free.deallocate(buffer); + free.deallocate(buffer); appendsInProgress.decrementAndGet(); } } + /** + * Append a new batch to the queue + * + * @param topic The topic + * @param partition The partition (cannot be RecordMetadata.UNKNOWN_PARTITION) + * @param dq The queue + * @param timestamp The timestamp of the record + * @param key The key for the record + * @param value The value for the record + * @param headers the Headers for the record + * @param callbacks The callbacks to execute + * @param buffer The buffer for the new batch + */ + private RecordAppendResult appendNewBatch(String topic, + int partition, + Deque<ProducerBatch> dq, + long timestamp, + byte[] key, + byte[] value, + Header[] headers, + AppendCallbacks callbacks, + ByteBuffer buffer) throws InterruptedException { + assert partition != RecordMetadata.UNKNOWN_PARTITION; + + // Update the current time in case the buffer allocation blocked above. Review Comment: We used to check if the producer is closed here. Is that still needed? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java: ########## @@ -25,12 +23,15 @@ private final Cluster cluster; private final Serializer<K> keySerializer; - private final DefaultPartitioner defaultPartitioner; + @SuppressWarnings("deprecation") + private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner; + + @SuppressWarnings("deprecation") public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) { this.cluster = cluster; this.keySerializer = keySerializer; - this.defaultPartitioner = new DefaultPartitioner(); + this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); Review Comment: I am wondering if the DefaultStreamPartitioner should be the built-in partition. @guozhangwang : What do you think? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -833,6 +1079,32 @@ public void close() { this.free.close(); } + /** + * Partitioner config for built-in paritioner Review Comment: typo paritioner ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -833,6 +1079,32 @@ public void close() { this.free.close(); } + /** + * Partitioner config for built-in paritioner + */ + public static final class PartitionerConfig { + private final boolean enableAdaptivePartitioning; + private final long partitionAvailabilityTimeoutMs; + + /** + * Partitioner config + * + * @param enableAdaptivePartitioning If it's true, partition switching adapts to broker load, otherwise parition + * swiching is random. Review Comment: typo swiching ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -160,91 +218,156 @@ public double measure(MetricConfig config, long now) { metrics.addMetric(metricName, availableBytes); } + private void setPartition(AppendCallbacks callbacks, int partition) { + if (callbacks != null) + callbacks.setPartition(partition); + } + /** * Add a record to the accumulator, return the append result * <p> * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created * <p> * - * @param tp The topic/partition to which this record is being sent + * @param topic The topic to which this record is being sent + * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION + * if any partition could be used * @param timestamp The timestamp of the record * @param key The key for the record * @param value The value for the record * @param headers the Headers for the record - * @param callback The user-supplied callback to execute when the request is complete + * @param callbacks The callbacks to execute * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and * running the partitioner's onNewBatch method before trying to append again * @param nowMs The current time, in milliseconds + * @param cluster The cluster metadata */ - public RecordAppendResult append(TopicPartition tp, + public RecordAppendResult append(String topic, + int partition, long timestamp, byte[] key, byte[] value, Header[] headers, - Callback callback, + AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, - long nowMs) throws InterruptedException { + long nowMs, + Cluster cluster) throws InterruptedException { + TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(k, batchSize)); + // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { - // check if we have an in-progress batch - Deque<ProducerBatch> dq = getOrCreateDeque(tp); - synchronized (dq) { - if (closed) - throw new KafkaException("Producer closed while send in progress"); - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) - return appendResult; - } + // Loop to retry in case we encounter partitioner's race conditions. + while (true) { + // If the message doesn't have any partition affinity, so we pick a partition based on the broker + // availability and performance. Note, that here we peek current partition before we hold the + // deque lock, so we'll need to make sure that it's not changed while we were waiting for the + // deque lock. + final BuiltInPartitioner.StickyPartitionInfo partitionInfo; + final int effectivePartition; + if (partition == RecordMetadata.UNKNOWN_PARTITION) { + partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster); + effectivePartition = partitionInfo.partition(); + } else { + partitionInfo = null; + effectivePartition = partition; + } - // we don't have an in-progress record batch try to allocate a new batch - if (abortOnNewBatch) { - // Return a result that will cause another call to append. - return new RecordAppendResult(null, false, false, true); - } + // Now that we know the effective partition, let the caller know. + setPartition(callbacks, effectivePartition); + + // check if we have an in-progress batch + Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>()); + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) + continue; + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); + if (appendResult != null) { + topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); + return appendResult; + } + } - byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); - log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock); - buffer = free.allocate(size, maxTimeToBlock); + // we don't have an in-progress record batch try to allocate a new batch + if (abortOnNewBatch) { + // Return a result that will cause another call to append. + return new RecordAppendResult(null, false, false, true, 0); + } - // Update the current time in case the buffer allocation blocked above. - nowMs = time.milliseconds(); - synchronized (dq) { - // Need to check if producer is closed again after grabbing the dequeue lock. - if (closed) - throw new KafkaException("Producer closed while send in progress"); + if (buffer == null) { + byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); + log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + buffer = free.allocate(size, maxTimeToBlock); + } - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) + continue; + RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer); + // Don't deallocate this buffer in the finally block as it's being used in the record batch + if (appendResult.newBatchCreated) + buffer = null; + topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); return appendResult; } - - MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); - ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs); - FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, - callback, nowMs)); - - dq.addLast(batch); - incomplete.add(batch); - - // Don't deallocate this buffer in the finally block as it's being used in the record batch - buffer = null; - return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { - if (buffer != null) - free.deallocate(buffer); + free.deallocate(buffer); appendsInProgress.decrementAndGet(); } } + /** + * Append a new batch to the queue + * + * @param topic The topic + * @param partition The partition (cannot be RecordMetadata.UNKNOWN_PARTITION) + * @param dq The queue + * @param timestamp The timestamp of the record + * @param key The key for the record + * @param value The value for the record + * @param headers the Headers for the record + * @param callbacks The callbacks to execute + * @param buffer The buffer for the new batch + */ + private RecordAppendResult appendNewBatch(String topic, + int partition, + Deque<ProducerBatch> dq, + long timestamp, + byte[] key, + byte[] value, + Header[] headers, + AppendCallbacks callbacks, + ByteBuffer buffer) throws InterruptedException { Review Comment: It seems this method doesn't throw InterruptedException? ########## streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java: ########## @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; Review Comment: There is also a reference to DefaultPartitioner in the javadoc below. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -1017,6 +1024,158 @@ public void testStickyBatches() throws Exception { assertEquals(appends, 2 * expectedAppends); } + @Test + public void testUniformBuiltInPartitioner() throws Exception { + + try { + // Mock random number generator with just sequential integer. + AtomicInteger mockRandom = new AtomicInteger(); + BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1); + + long totalSize = 1024 * 1024; + int batchSize = 128; // note that this is also a "sticky" limit for the partitioner + RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 0); + + // Set up callbacks so that we know what partition is chosen. + final int[] partition = {RecordMetadata.UNKNOWN_PARTITION}; Review Comment: Do we need an array since we only return one partition? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.record.Record; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BuiltInPartitionerTest { + private final static Node[] NODES = new Node[] { + new Node(0, "localhost", 99), + new Node(1, "localhost", 100), + new Node(2, "localhost", 101), + new Node(11, "localhost", 102) + }; + final static String TOPIC_A = "topicA"; + final static String TOPIC_B = "topicB"; + final static String TOPIC_C = "topicC"; + final static Header[] EMPTY_HEADERS = Record.EMPTY_HEADERS; Review Comment: EMPTY_HEADERS is no longer referenced. ########## clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java: ########## @@ -117,10 +116,24 @@ public MockProducer(final Cluster cluster, * * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)} */ + @SuppressWarnings("deprecation") public MockProducer(final boolean autoComplete, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { - this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer); + this(Cluster.empty(), autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer); Review Comment: Should the partitioner be the built-in one now? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Built-in default partitioner. Note, that this is just a utility class that is used directly from + * RecordAccumulator, it does not implement the Partitioner interface. + * + * The class keeps track of various bookkeeping information required for adaptive sticky partitioning + * (described in detail in KIP-794). There is one partitioner object per topic. + */ +public class BuiltInPartitioner { + private final String topic; + private final int stickyBatchSize; + + private volatile PartitionLoadStats partitionLoadStats = null; + private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>(); + + // Visible and used for testing only. + static volatile public Supplier<Integer> mockRandom = null; + + /** + * BuiltInPartitioner constructor. + * + * @param topic The topic + * @param stickyBatchSize How much to produce to partition before switch + */ + public BuiltInPartitioner(String topic, int stickyBatchSize) { + this.topic = topic; + this.stickyBatchSize = stickyBatchSize; + } + + /** + * Calculate the next partition for the topic based on the partition load stats. + */ + private int nextPartition(Cluster cluster) { + int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); + + // Cache volatile variable in local variable. + PartitionLoadStats partitionLoadStats = this.partitionLoadStats; + + if (partitionLoadStats == null) { + // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next + // partition based on uniform distribution. + List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) + return availablePartitions.get(random % availablePartitions.size()).partition(); + + // We don't have available partitions, just pick one among all partitions. + List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); + return random % partitions.size(); + } else { + // Calculate next partition based on load distribution. + assert partitionLoadStats.length > 0; + + int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable; + int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1]; + + // By construction, the cumulative frequency table is sorted, so we can use binary + // search to find the desired index. + int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom); + + // binarySearch results the index of the found element, or -(insertion_point) - 1 + // (where insertion_point is the index of the first element greater than the key). + // We need to get the index of the first value that is strictly greater, which + // would be the insertion point, except if we found the element that's equal to + // the searched value (in this case we need to get next). For example, if we have + // 4 5 8 + // and we're looking for 3, then we'd get the insertion_point = 0, and the function + // would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd + // get 0, and we need the next one, so adding 1 works here as well. + int partitionIndex = Math.abs(searchResult + 1); + assert partitionIndex < partitionLoadStats.length; + return partitionLoadStats.partitionIds[partitionIndex]; + } + } + + /** + * Test-only function. When partition load stats are defined, return the end of range for the + * random number. + */ + public int loadStatsRangeEnd() { + assert partitionLoadStats != null; + assert partitionLoadStats.length > 0; + return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1]; + } + + /** + * Peek currently chosen sticky partition. This method works in conjunction with {@link #isPartitionChanged} + * and {@link #updatePartitionInfo}. The workflow is the following: + * + * 1. peekCurrentPartitionInfo is called to know which partition to lock. + * 2. Lock partition's batch queue. + * 3. isPartitionChanged under lock to make sure that nobody raced us. + * 4. Append data to buffer. + * 5. updatePartitionInfo to update produced bytes and maybe switch partition. + * + * It's important that steps 3-5 are under partition's batch queue lock. + * + * @param cluster The cluster information (needed if there is no current partition) + * @return sticky partition info object + */ + StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) { + StickyPartitionInfo partitionInfo = stickyPartitionInfo.get(); + if (partitionInfo != null) + return partitionInfo; + + // We're the first to create it. + int partition = nextPartition(cluster); + partitionInfo = new StickyPartitionInfo(partition); + if (stickyPartitionInfo.compareAndSet(null, partitionInfo)) + return partitionInfo; + + // Someone has raced us. + return stickyPartitionInfo.get(); + } + + /** + * Check if partition is changed by a concurrent thread. NOTE this function needs to be called under + * the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @return true if sticky partition object is changed (race condition) + */ + boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { + return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo; + } + + /** + * Update partition info with the number of bytes appended and maybe switch partition. + * NOTE this function needs to be called under the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @param appendedBytes The number of bytes appended to this partition + * @param cluster The cluster information + */ + void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) { + if (partitionInfo == null) Review Comment: Should we assert that partitionInfo is not null? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Built-in default partitioner. Note, that this is just a utility class that is used directly from + * RecordAccumulator, it does not implement the Partitioner interface. + * + * The class keeps track of various bookkeeping information required for adaptive sticky partitioning + * (described in detail in KIP-794). There is one partitioner object per topic. + */ +public class BuiltInPartitioner { + private final String topic; + private final int stickyBatchSize; + + private volatile PartitionLoadStats partitionLoadStats = null; + private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>(); + + // Visible and used for testing only. + static volatile public Supplier<Integer> mockRandom = null; + + /** + * BuiltInPartitioner constructor. + * + * @param topic The topic + * @param stickyBatchSize How much to produce to partition before switch + */ + public BuiltInPartitioner(String topic, int stickyBatchSize) { + this.topic = topic; + this.stickyBatchSize = stickyBatchSize; + } + + /** + * Calculate the next partition for the topic based on the partition load stats. + */ + private int nextPartition(Cluster cluster) { + int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); + + // Cache volatile variable in local variable. + PartitionLoadStats partitionLoadStats = this.partitionLoadStats; + + if (partitionLoadStats == null) { + // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next + // partition based on uniform distribution. + List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) + return availablePartitions.get(random % availablePartitions.size()).partition(); + + // We don't have available partitions, just pick one among all partitions. + List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); + return random % partitions.size(); + } else { + // Calculate next partition based on load distribution. + assert partitionLoadStats.length > 0; + + int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable; Review Comment: In the adaptive case, it seems that we should also avoid selecting partitions with no leader, if possible? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -467,22 +466,31 @@ public void testAppendInExpiryCallback() throws InterruptedException { final byte[] key = "key".getBytes(); final byte[] value = "value".getBytes(); final long maxBlockTimeMs = 1000; - Callback callback = (metadata, exception) -> { - if (exception instanceof TimeoutException) { - expiryCallbackCount.incrementAndGet(); - try { - accumulator.append(tp1, 0L, key, value, - Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - } catch (InterruptedException e) { - throw new RuntimeException("Unexpected interruption", e); - } - } else if (exception != null) - unexpectedException.compareAndSet(null, exception); + Cluster cluster = TestUtils.singletonCluster(); + RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() { + @Override + public void setPartition(int partition) { + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception instanceof TimeoutException) { + expiryCallbackCount.incrementAndGet(); + try { + accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, + Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + } catch (InterruptedException e) { + throw new RuntimeException("Unexpected interruption", e); + } + } else if (exception != null) + unexpectedException.compareAndSet(null, exception); + Review Comment: extra empty line. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Built-in default partitioner. Note, that this is just a utility class that is used directly from + * RecordAccumulator, it does not implement the Partitioner interface. + * + * The class keeps track of various bookkeeping information required for adaptive sticky partitioning + * (described in detail in KIP-794). There is one partitioner object per topic. + */ +public class BuiltInPartitioner { + private final String topic; + private final int stickyBatchSize; + + private volatile PartitionLoadStats partitionLoadStats = null; + private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>(); + + // Visible and used for testing only. + static volatile public Supplier<Integer> mockRandom = null; + + /** + * BuiltInPartitioner constructor. + * + * @param topic The topic + * @param stickyBatchSize How much to produce to partition before switch + */ + public BuiltInPartitioner(String topic, int stickyBatchSize) { + this.topic = topic; + this.stickyBatchSize = stickyBatchSize; + } + + /** + * Calculate the next partition for the topic based on the partition load stats. + */ + private int nextPartition(Cluster cluster) { + int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); + + // Cache volatile variable in local variable. + PartitionLoadStats partitionLoadStats = this.partitionLoadStats; + + if (partitionLoadStats == null) { + // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next + // partition based on uniform distribution. + List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) + return availablePartitions.get(random % availablePartitions.size()).partition(); + + // We don't have available partitions, just pick one among all partitions. + List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); + return random % partitions.size(); + } else { + // Calculate next partition based on load distribution. + assert partitionLoadStats.length > 0; + + int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable; + int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1]; + + // By construction, the cumulative frequency table is sorted, so we can use binary + // search to find the desired index. + int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom); + + // binarySearch results the index of the found element, or -(insertion_point) - 1 + // (where insertion_point is the index of the first element greater than the key). + // We need to get the index of the first value that is strictly greater, which + // would be the insertion point, except if we found the element that's equal to + // the searched value (in this case we need to get next). For example, if we have + // 4 5 8 + // and we're looking for 3, then we'd get the insertion_point = 0, and the function + // would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd + // get 0, and we need the next one, so adding 1 works here as well. + int partitionIndex = Math.abs(searchResult + 1); + assert partitionIndex < partitionLoadStats.length; + return partitionLoadStats.partitionIds[partitionIndex]; + } + } + + /** + * Test-only function. When partition load stats are defined, return the end of range for the + * random number. + */ + public int loadStatsRangeEnd() { + assert partitionLoadStats != null; + assert partitionLoadStats.length > 0; + return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1]; + } + + /** + * Peek currently chosen sticky partition. This method works in conjunction with {@link #isPartitionChanged} + * and {@link #updatePartitionInfo}. The workflow is the following: + * + * 1. peekCurrentPartitionInfo is called to know which partition to lock. + * 2. Lock partition's batch queue. + * 3. isPartitionChanged under lock to make sure that nobody raced us. + * 4. Append data to buffer. + * 5. updatePartitionInfo to update produced bytes and maybe switch partition. + * + * It's important that steps 3-5 are under partition's batch queue lock. + * + * @param cluster The cluster information (needed if there is no current partition) + * @return sticky partition info object + */ + StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) { + StickyPartitionInfo partitionInfo = stickyPartitionInfo.get(); + if (partitionInfo != null) + return partitionInfo; + + // We're the first to create it. + int partition = nextPartition(cluster); + partitionInfo = new StickyPartitionInfo(partition); + if (stickyPartitionInfo.compareAndSet(null, partitionInfo)) + return partitionInfo; + + // Someone has raced us. + return stickyPartitionInfo.get(); + } + + /** + * Check if partition is changed by a concurrent thread. NOTE this function needs to be called under + * the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @return true if sticky partition object is changed (race condition) + */ + boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { + return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo; + } + + /** + * Update partition info with the number of bytes appended and maybe switch partition. + * NOTE this function needs to be called under the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @param appendedBytes The number of bytes appended to this partition + * @param cluster The cluster information + */ + void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) { + if (partitionInfo == null) + return; + + assert partitionInfo == stickyPartitionInfo.get(); + int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes); + if (producedBytes >= stickyBatchSize) { + // We've produced enough to this partition, switch to next. + int partition = nextPartition(cluster); + StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(partition); + stickyPartitionInfo.set(newPartitionInfo); + } + } + + /** + * Update partition load stats from the queue sizes of each partition. + * NOTE: queueSizes are modified in place to avoid allocations + * + * @param queueSizes The queue sizes + * @param partitionIds The partition ids for the queues + * @param length The logical length of the arrays (could be less): we may eliminate some partitions + * based on latency, but to avoid reallocation of the arrays, we just decrement + * logical length + * Visible for testing + */ + public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int length) { + if (queueSizes == null) { + partitionLoadStats = null; + return; + } + assert queueSizes.length == partitionIds.length; + assert length <= queueSizes.length; + + // The queueSizes.length represents the number of all partitions in the topic and if we have + // less than 2 partitions, there is no need to do adaptive logic. + // If partitioner.availability.timeout.ms != 0, then partitions that experience high latencies + // (greater than partitioner.availability.timeout.ms) may be excluded, the length represents + // partitions that are not excluded. If some partitions were excluded, we'd still want to + // go through adaptive logic, even if we have one partition. + // See also RecordAccumulator#partitionReady where the queueSizes are built. + if (length < 1 || queueSizes.length < 2) { Review Comment: Would it be useful to add a debug level logging if length is less than queueSizes.length? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org