artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r861185234


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that 
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for 
adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per 
topic.
+ */
+public class BuiltInPartitioner {
+    private final String topic;
+    private final int stickyBatchSize;
+    private final CompressionType compression;
+    private final ApiVersions apiVersions;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = 
new AtomicReference<>();
+
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     * @param compression The compression codec for the records
+     * @param apiVersions Request API versions for current connected brokers
+     */
+    public BuiltInPartitioner(String topic,
+                              int stickyBatchSize,
+                              CompressionType compression,
+                              ApiVersions apiVersions) {
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+        this.compression = compression;
+        this.apiVersions = apiVersions;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load 
stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's 
disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % 
availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all 
partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            assert partitionLoadStats.length > 0;
+
+            int[] probabilityWeights = partitionLoadStats.probabilityWeights;
+            int weightedRandom = random % 
probabilityWeights[partitionLoadStats.length - 1];
+
+            // By construction, the CDF separators are sorted, so we can use 
binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(probabilityWeights, 0, 
partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or 
-(insertion_point) - 1
+            // (where insertion_point is the index of the first element 
greater than the key).
+            // We need to get the index of the first value that is strictly 
greater, which
+            // would be the insertion point, except if we found the element 
that's equal to
+            // the searched value (in this case we need to get next).  For 
example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, 
and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're 
looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            return partitionLoadStats.partitionIds[partitionIndex];
+        }
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the 
end of range for the
+     * random number.
+     */
+    public int getLoadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return partitionLoadStats.probabilityWeights[partitionLoadStats.length 
- 1];
+    }
+
+    /**
+     * Calculate the partition trying to optimize for batching and broker load.
+     * We keep track of bytes produced to partition and switch to a new one 
only after a certain amount of
+     * bytes has been produced (a.k.a. "sticky" partitioning logic).
+     *
+     * @param key The record key
+     * @param value The record value
+     * @param headers The record header
+     * @param byteSizeStatsMap The map partition -> byte size stats
+     * @param cluster The cluster information
+     * @return The partition to use for this record
+     */
+    public int partition(byte[] key, byte[] value, Header[] headers,
+                         ConcurrentMap<Integer, PartitionByteSizeStats> 
byteSizeStatsMap, Cluster cluster) {
+        // Loop to retry if our atomic ops are raced.
+        while (true) {
+            StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+            if (partitionInfo == null || partitionInfo.producedBytes.get() >= 
stickyBatchSize) {
+                // The partition has exceeded the "stickiness" limit, need to 
switch.
+                int partition = nextPartition(cluster);
+                StickyPartitionInfo newPartitionInfo = new 
StickyPartitionInfo(partition);
+                if (!stickyPartitionInfo.compareAndSet(partitionInfo, 
newPartitionInfo)) {
+                    // We've got raced, retry.
+                    continue;
+                }
+                partitionInfo = newPartitionInfo;
+            }
+
+            // Try to update bookkeeping information for the partition.
+            final int recordSize = 
estimateRecordSize(byteSizeStatsMap.get(partitionInfo.index), key, value, 
headers);

Review Comment:
   Re-implemented this logic as suggested 
https://github.com/apache/kafka/pull/12049/commits/001380101bdcc6ad1147bcb0b1f8174fc12beb87.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to