[GitHub] [kafka] junrao commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r971047147 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(1, mockRandom.get()); // Produce large record, we should exceed "sticky" limit, but produce to this partition -// as we switch after the "sticky" limit is exceeded. The partition is switched after -// we produce. +// as we try to switch after the "sticky" limit is exceeded. The switch is disabled +// because of incomplete batch. byte[] largeValue = new byte[batchSize]; accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(partition1, partition.get()); -assertEquals(2, mockRandom.get()); +assertEquals(1, mockRandom.get()); -// Produce large record, we should switch to next partition. +// Produce large record, we switched to next partition by previous produce, but Review Comment: To be precise, the previous produce didn't switch to the next partition. The produce of this record forces the closing of the current batch, which cause the switch to the next partition. ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(1, mockRandom.get()); // Produce large record, we should exceed "sticky" limit, but produce to this partition -// as we switch after the "sticky" limit is exceeded. The partition is switched after -// we produce. +// as we try to switch after the "sticky" limit is exceeded. The switch is disabled +// because of incomplete batch. Review Comment: Thanks for the explanation, Artem. This makes sense to me now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r970263402 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(1, mockRandom.get()); // Produce large record, we should exceed "sticky" limit, but produce to this partition -// as we switch after the "sticky" limit is exceeded. The partition is switched after -// we produce. +// as we try to switch after the "sticky" limit is exceeded. The switch is disabled +// because of incomplete batch. Review Comment: I am trying to understand why the switch is disabled here. It seems that the large record won't fit in the current batch. So, we will put the large record in a new batch. This batch will then be full since the large record has more than batch.size bytes in it. This should allow the switch, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r970071437 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1137,23 +1137,26 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(partition1, partition.get()); assertEquals(2, mockRandom.get()); -// Produce large record, we should switch to next partition. +// Produce large record, we switched to next partition by previous produce, but +// for this produce the switch would be disabled because of incomplete batch. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(partition2, partition.get()); -assertEquals(3, mockRandom.get()); +assertEquals(2, mockRandom.get()); Review Comment: Thanks, Artem. Sorry, I still don't fully understand. After step 3, it seems that we switched to partition 2 after the append since mockRandom is 2, right? That part makes sense to me. In step 4, we append to a new batch in partition 2. After the append(), it seems that enableSwitch should be true since `last.isFull()` should be true. Then, in `topicInfo.builtInPartitioner.updatePartitionInfo`, `producedBytes >= stickyBatchSize && enableSwitch` should be true, which will trigger partition switching. I am wondering what's missing here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r967484263 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -378,6 +415,15 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag } /** + * Check if all batches in the queue are full. + */ +private boolean allBatchesFull(Deque deque) { Review Comment: allBatchesFull => lastBatchFull ? ## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ## @@ -170,13 +170,49 @@ boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { * @param cluster The cluster information */ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) { +updatePartitionInfo(partitionInfo, appendedBytes, cluster, true); +} + +/** + * Update partition info with the number of bytes appended and maybe switch partition. + * NOTE this function needs to be called under the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @param appendedBytes The number of bytes appended to this partition + * @param cluster The cluster information + * @param enableSwitch If true, switch partition once produced enough bytes + */ +void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster, boolean enableSwitch) { // partitionInfo may be null if the caller didn't use built-in partitioner. if (partitionInfo == null) return; assert partitionInfo == stickyPartitionInfo.get(); int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes); -if (producedBytes >= stickyBatchSize) { + +// We're trying to switch partition once we produce stickyBatchSize bytes to a partition +// but doing so may hinder batching because partition switch may happen while batch isn't +// ready to send. This situation is especially likely with high linger.ms setting. +// Consider the following example: +// linger.ms=500, producer produces 12KB in 500ms, batch.size=16KB +// - first batch collects 12KB in 500ms, gets sent +// - second batch collects 4KB, then we switch partition, so 4KB gets eventually sent +// - ... and so on - we'd get 12KB and 4KB batches +// To get more optimal batching and avoid 4KB fractional batches, the caller may disallow +// partition switch if batch is not ready to send, so with the example above we'd avoid +// fractional 4KB batches: in that case the scenario would look like this: +// - first batch collects 12KB in 500ms, gets sent +// - second batch collects 4KB, but partition switch doesn't happen because batch in not ready +// - second batch collects 12KB in 500ms, gets sent and now we switch partition. +// - ... and so on - we'd just send 12KB batches +// We cap the produced bytes to not exceed 2x of the batch size to avoid pathological cases +// (e.g. if we have a mix of keyed and unkeyed messages, key messages may create an +// unready batch after the batch that disabled partition switch becomes ready). +// As a result, with high latency.ms setting we end up switching partitions after producing +// between stickyBatchSize and stickyBatchSize * 2 bytes, to better align with batch boundary. +if (producedBytes >= stickyBatchSize * 2) +log.trace("Exceeded {} bytes, produced {} bytes, enable is {}", stickyBatchSize * 2, producedBytes, enableSwitch); Review Comment: Could we rephrase this to be more readable? Sth like "Produced $producedBytes bytes, exceeding twice the batch size of $stickyBatchSize, with switching set to $enableSwitch". ## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ## @@ -170,13 +170,49 @@ boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { * @param cluster The cluster information */ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) { +updatePartitionInfo(partitionInfo, appendedBytes, cluster, true); +} + +/** + * Update partition info with the number of bytes appended and maybe switch partition. + * NOTE this function needs to be called under the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @param appendedBytes The number of bytes appended to this partition + * @param cluster The cluster information + * @param enableSwitch If true, switch partition once produced
[GitHub] [kafka] junrao commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r964182091 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -378,6 +415,16 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag } /** + * Check if there are ready batches in the queue, or we sent all batches. + */ +private boolean queueHasReadyBatches(Deque deque, long nowMs) { +// Note that we also check if the queue is empty, because that may mean that batches became +// ready and we sent them. +ProducerBatch last = deque.peekLast(); +return deque.size() > 1 || last == null || last.isFull() || last.waitedTimeMs(nowMs) >= lingerMs; Review Comment: Here, we are specifically handling the case with lingerMs > 0. I am wondering if the same issue described in the jira could occur with lingerMs equals to 0. With lingerMs = 0, because of back pressure, the effective batch size is typically between 1 and batch.size. With the built-in partitioner, we could still have the issue with a large batch size followed by a small one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org