[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…
mehbey commented on code in PR #14135: URL: https://github.com/apache/kafka/pull/14135#discussion_r1290912365 ## core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala: ## @@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { } } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) Review Comment: Thank you for clarifying that. I have added the `quorum` param and verified that the test name is updated during runtime as you described. Here is what I have captured from the test run out put ``` Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=zk PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=zk PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=zk PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=kraft PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=kraft PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=kraft PASSED ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…
mehbey commented on code in PR #14135: URL: https://github.com/apache/kafka/pull/14135#discussion_r1290912365 ## core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala: ## @@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { } } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) Review Comment: Thank you for clarifying that. I have added the `quorum` param and verified that the test name is actually updated during runtime as you described. Here is what I have captured from the test run out put ``` Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=zk PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=zk PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=zk PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=kraft PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=kraft PASSED Gradle Test Run :core:test > Gradle Test Executor 3 > PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, long).quorum=kraft PASSED ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mehbey commented on pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…
mehbey commented on PR #14135: URL: https://github.com/apache/kafka/pull/14135#issuecomment-1674232688 > Whenever we are making customer facing changes (adding new config in this scenario), we usually mention it in upgrade notes at > > https://github.com/apache/kafka/blob/ddeb89f4a9f31b5ff63661e18a94c403a8f45f69/docs/upgrade.html#L24 > > Please add information about change made in this PR over there. Updated documentation to indicate that we have deprecated ` log.message.timestamp.difference.max.ms ` config and added two new configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?
[ https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753049#comment-17753049 ] Kong Yin Lai edited comment on KAFKA-10875 at 8/11/23 5:14 AM: --- We are also seeing something that looks a lot like this issue. Software we are using: * Kafka version 2.6.0 * librdkafka 1.9.2 It is quite rare and only seems to affect "compact,delete" topics where a lot of compaction is happening i.e. data is continually being published that "replaces" almost all of existing data with the same key. And in all cases seen so far, a restart of the client would successfully retrieve the offset for the time that it requests (which is 12am of the current day in local time). was (Author: kong): We are also seeing something that looks a lot like this issue. Software we are using: * Kafka version 2.6.0 * librdkakfka 1.9.2 It is quite rare and only seems to affect "compact,delete" topics where a lot of compaction is happening i.e. data is continually being published that "replaces" almost all of existing data with the same key. And in all cases seen so far, a restart of the client would successfully retrieve the offset for the time that it requests (which is 12am of the current day in local time). > offsetsForTimes returns null for some partitions when it shouldn't? > --- > > Key: KAFKA-10875 > URL: https://issues.apache.org/jira/browse/KAFKA-10875 > Project: Kafka > Issue Type: Bug >Reporter: Yifei Gong >Priority: Minor > > I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1 > I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am > trying to seek to offsets after certain timestamp inside > {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}. > I found this strange behavior of method {{offsetsForTimes}}: > When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 > 5:06:55.534 AM) like below: > {code:java} > @Override > public void onPartitionsAssigned(Consumer consumer, > Collection partitions) { > // calling assignment just to ensure my consumer is actually assigned the > partitions > Set tps = consumer.assignment(); > Map offsetsForTimes = new HashMap<>(); > offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream() > .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L; > } > {code} > By setting breakpoint, I can see I got below map: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)" > {TopicPartition@5498} "My.Data.Topic-5" -> null > {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} > "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)" > {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} > "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" > {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} > "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" > {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} > "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" > {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} > "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" > {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} > "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" > {TopicPartition@5518} "My.Data.Topic-6" -> null > {noformat} > As you can see some of the partitions (5 and 6) it returned null. > However, if I seek a more recent timestamp like {{1607941818423L}} (GMT > December 14, 2020 10:30:18.423 AM), I got offsets for all partitions: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)" > {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} > "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)" > {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} > "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)" > {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} > "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)" > {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} > "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)" > {TopicPartition@5510} "My.Data.Topic-9" ->
[jira] [Commented] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?
[ https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753049#comment-17753049 ] Kong Yin Lai commented on KAFKA-10875: -- We are also seeing something that looks a lot like this issue. Software we are using: * Kafka version 2.6.0 * librdkakfka 1.9.2 It is quite rare and only seems to affect "compact,delete" topics where a lot of compaction is happening i.e. data is continually being published that "replaces" almost all of existing data with the same key. And in all cases seen so far, a restart of the client would successfully retrieve the offset for the time that it requests (which is 12am of the current day in local time). > offsetsForTimes returns null for some partitions when it shouldn't? > --- > > Key: KAFKA-10875 > URL: https://issues.apache.org/jira/browse/KAFKA-10875 > Project: Kafka > Issue Type: Bug >Reporter: Yifei Gong >Priority: Minor > > I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1 > I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am > trying to seek to offsets after certain timestamp inside > {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}. > I found this strange behavior of method {{offsetsForTimes}}: > When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 > 5:06:55.534 AM) like below: > {code:java} > @Override > public void onPartitionsAssigned(Consumer consumer, > Collection partitions) { > // calling assignment just to ensure my consumer is actually assigned the > partitions > Set tps = consumer.assignment(); > Map offsetsForTimes = new HashMap<>(); > offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream() > .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L; > } > {code} > By setting breakpoint, I can see I got below map: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)" > {TopicPartition@5498} "My.Data.Topic-5" -> null > {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} > "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)" > {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} > "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" > {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} > "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" > {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} > "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" > {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} > "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" > {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} > "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" > {TopicPartition@5518} "My.Data.Topic-6" -> null > {noformat} > As you can see some of the partitions (5 and 6) it returned null. > However, if I seek a more recent timestamp like {{1607941818423L}} (GMT > December 14, 2020 10:30:18.423 AM), I got offsets for all partitions: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)" > {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} > "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)" > {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} > "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)" > {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} > "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)" > {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} > "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)" > {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} > "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)" > {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} > "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)" > {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} > "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)" > {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} > "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)" > {noformat} > So I am confused why seeking to an older timestamp gave me nulls when there > are indeed
[GitHub] [kafka] bachmanity1 commented on pull request #14190: KAFKA-7438: Replace Easymock & Powermock with Mockito in RocksDBMetricsRecorderGa…
bachmanity1 commented on PR #14190: URL: https://github.com/apache/kafka/pull/14190#issuecomment-1674220529 @cadonna @divijvaidya can you please review this? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 opened a new pull request, #14190: KAFKA-7438: Replace Easymock & Powermock with Mockito in RocksDBMetricsRecorderGa…
bachmanity1 opened a new pull request, #14190: URL: https://github.com/apache/kafka/pull/14190 Replace Easymock & Powermock with Mockito in RocksDBMetricsRecorderGaugesTest. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 commented on pull request #14189: KAFKA-7438: Replace Easymock & Powermock with Mockito in TableSourceNodeTest
bachmanity1 commented on PR #14189: URL: https://github.com/apache/kafka/pull/14189#issuecomment-1674210277 @cadonna @mjsax @C0urante can you please review this PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 opened a new pull request, #14189: KAFKA-7438: Replace Easymock & Powermock with Mockito in TableSourceNodeTest
bachmanity1 opened a new pull request, #14189: URL: https://github.com/apache/kafka/pull/14189 Replace Easymock & Powermock with Mockito in TableSourceNodeTest ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable
C0urante commented on PR #14177: URL: https://github.com/apache/kafka/pull/14177#issuecomment-1674205775 Changes seems fine, but CI results are a little worrying. Nothing directly tied to this PR on the surface, but I've kicked off another build just to be safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build
C0urante commented on PR #14186: URL: https://github.com/apache/kafka/pull/14186#issuecomment-1674202980 @divijvaidya given that this touches on the strict stubbing feature that you've helped introduce to our tests, would you be interested in reviewing? @mumrah @ijuma based on discussion on [KAFKA-4594](https://issues.apache.org/jira/browse/KAFKA-4594) and https://github.com/apache/kafka/pull/2695 it seems like the original intent behind the two-task approach (i.e., running `unitTest integrationTest` instead of just `test`) was to get faster feedback from our CI builds, but that this was a minor concern compared to the goal of the ticket to just be able to run unit tests in isolation locally. Given that it's a bit like finding a needle in a haystack now to sift through in-progress CI results and this PR doesn't compromise the original goal laid out in Jira, this change seems safe enough to make. However, in case there's context I'm missing or you feel differently, I wanted to give you a chance to weigh in on this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290880628 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; Review Comment: Changed a few of them, rest we discussed on call -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290880471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; +// Count of members expected to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int expectedNumMembersWithExtraPartition; +// Map of members to their remaining partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the number of partitions they still need to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +// Partitions that still need to be assigned. +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the previous owner of each partition when using rack-aware strategy. +private final Map partitionToPrevOwner; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignor(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.assignmentSpec = assignmentSpec; + +subscriptionList = new ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + +int totalPartitionsCount = 0; +// Removes the current topic from
[GitHub] [kafka] C0urante commented on pull request #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build
C0urante commented on PR #14186: URL: https://github.com/apache/kafka/pull/14186#issuecomment-1674193833 As we can see with the [CI results for the first build](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14186/1/tests), unused stubbings were successfully reported. I've reverted the intentionally-failing commit now, and this should be ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM
[ https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-15083: -- Description: Based on the [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM implementation creates producer and consumer instances. Common client propoerties can be configured with `remote.log.metadata.common.client.` prefix. User can also pass properties specific to {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.{color} {color:#00}Any other properties should be prefixed with "remote.log.metadata." and these will be passed to RemoteLogMetadataManager#configure(Map props).{color} {color:#00}For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.{color}| This is missed from current implementation. When configuring RLMM, the configs passed into {{configure}} method is the {{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's no configs related to {{{}remote.log.metadata.*{}}}, ex: {{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have set the config in broker, it'll never be applied. This PR fixed the issue to allow users setting RLMM prefix: {{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass into RLMM, including {{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/ {{remote.log.metadata.consumer.}} prefixes. Ex: {code:java} # default value # remote.log.storage.manager.impl.prefix=rsm.config. # remote.log.metadata.manager.impl.prefix=rlmm.config. # pass to RLMM rlmm.config.remote.log.metadata.topic.num.partitions=50 rlmm.config.remote.log.metadata.topic.replication.factor=4 # pass to RSM rsm.config.test=value {code} was: Based on the [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM implementation creates producer and consumer instances. Common client propoerties can be configured with `remote.log.metadata.common.client.` prefix. User can also pass properties specific to {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.{color} {color:#00}Any other properties should be prefixed with "remote.log.metadata." and these will be passed to RemoteLogMetadataManager#configure(Map props).{color} {color:#00}For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.{color}| This is missed from current implementation. When configuring RLMM, the configs passed into {{configure}} method is the {{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's no configs related to {{{}remote.log.metadata.*{}}}, ex: {{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have set the config in broker, it'll never be applied. This PR fixed the issue to allow users setting RLMM prefix: {{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass into RLMM, including {{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/ {{remote.log.metadata.consumer.}} prefixes. Ex: # default value # remote.log.storage.manager.impl.prefix=rsm.config. # remote.log.metadata.manager.impl.prefix=rlmm.config. rlmm.config.remote.log.metadata.topic.num.partitions=50 rlmm.config.remote.log.metadata.topic.replication.factor=4 rsm.config.test=value > Passing "remote.log.metadata.*" configs into RLMM > - > > Key: KAFKA-15083 > URL: https://issues.apache.org/jira/browse/KAFKA-15083 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Based on the > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: > |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM > implementation creates producer and consumer instances. Common client > propoerties can be configured with `remote.log.metadata.common.client.` > prefix. User can also pass properties
[GitHub] [kafka] showuon merged pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix
showuon merged PR #14151: URL: https://github.com/apache/kafka/pull/14151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM
[ https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-15083: -- Description: Based on the [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM implementation creates producer and consumer instances. Common client propoerties can be configured with `remote.log.metadata.common.client.` prefix. User can also pass properties specific to {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.{color} {color:#00}Any other properties should be prefixed with "remote.log.metadata." and these will be passed to RemoteLogMetadataManager#configure(Map props).{color} {color:#00}For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.{color}| This is missed from current implementation. When configuring RLMM, the configs passed into {{configure}} method is the {{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's no configs related to {{{}remote.log.metadata.*{}}}, ex: {{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have set the config in broker, it'll never be applied. This PR fixed the issue to allow users setting RLMM prefix: {{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass into RLMM, including {{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/ {{remote.log.metadata.consumer.}} prefixes. Ex: # default value # remote.log.storage.manager.impl.prefix=rsm.config. # remote.log.metadata.manager.impl.prefix=rlmm.config. rlmm.config.remote.log.metadata.topic.num.partitions=50 rlmm.config.remote.log.metadata.topic.replication.factor=4 rsm.config.test=value was: Based on the [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM implementation creates producer and consumer instances. Common client propoerties can be configured with `remote.log.metadata.common.client.` prefix. User can also pass properties specific to {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.{color} {color:#00}Any other properties should be prefixed with "remote.log.metadata." and these will be passed to RemoteLogMetadataManager#configure(Map props).{color} {color:#00}For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.{color}| This is missed from current implementation. > Passing "remote.log.metadata.*" configs into RLMM > - > > Key: KAFKA-15083 > URL: https://issues.apache.org/jira/browse/KAFKA-15083 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Based on the > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: > |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM > implementation creates producer and consumer instances. Common client > propoerties can be configured with `remote.log.metadata.common.client.` > prefix. User can also pass properties specific to > {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` > and `remote.log.metadata.consumer.` prefixes. These will override properties > with `remote.log.metadata.common.client.` prefix.{color} > {color:#00}Any other properties should be prefixed with > "remote.log.metadata." and these will be passed to > RemoteLogMetadataManager#configure(Map props).{color} > {color:#00}For ex: Security configuration to connect to the local broker > for the listener name configured are passed with props.{color}| > > This is missed from current implementation. > > When configuring RLMM, the configs passed into {{configure}} method is the > {{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, > there's no configs related to {{{}remote.log.metadata.*{}}}, ex: > {{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have > set the config in broker, it'll never be applied. > This PR fixed the issue to allow users setting RLMM prefix: >
[GitHub] [kafka] showuon commented on pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix
showuon commented on PR #14151: URL: https://github.com/apache/kafka/pull/14151#issuecomment-1674147451 Failed tests are unrelated: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft Build / JDK 11 and Scala 2.13 / kafka.log.remote.RemoteIndexCacheTest.testClose() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testConfigResourceExistenceChecker() Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #14121: MINOR: Add comment to onPartitionsLost override
showuon merged PR #14121: URL: https://github.com/apache/kafka/pull/14121 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14121: MINOR: Add comment to onPartitionsLost override
showuon commented on PR #14121: URL: https://github.com/apache/kafka/pull/14121#issuecomment-1674146055 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest
[ https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deng Ziming updated KAFKA-15289: Description: we are calling `zkBrokerApis` in , we should ensure kraft broker apis are also supported, so use clientApis as far as possible.use zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis in most cases (was: In most Test cases, we are calling `zkBrokerApis`, we should ensure kraft broker apis are also supported, so use clientApis as far as possible.) > Support KRaft mode in RequestQuotaTest > -- > > Key: KAFKA-15289 > URL: https://issues.apache.org/jira/browse/KAFKA-15289 > Project: Kafka > Issue Type: Sub-task >Reporter: Deng Ziming >Priority: Major > Labels: newbee > > we are calling `zkBrokerApis` in , we should ensure kraft broker apis are > also supported, so use clientApis as far as possible.use > zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis in most cases -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest
[ https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deng Ziming updated KAFKA-15289: Description: we are calling `zkBrokerApis` in RequestQuotaTest, we should ensure kraft broker apis are also supported, so use clientApis as far as possible.use zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis. (was: we are calling `zkBrokerApis` in , we should ensure kraft broker apis are also supported, so use clientApis as far as possible.use zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis in most cases) > Support KRaft mode in RequestQuotaTest > -- > > Key: KAFKA-15289 > URL: https://issues.apache.org/jira/browse/KAFKA-15289 > Project: Kafka > Issue Type: Sub-task >Reporter: Deng Ziming >Priority: Major > Labels: newbee > > we are calling `zkBrokerApis` in RequestQuotaTest, we should ensure kraft > broker apis are also supported, so use clientApis as far as possible.use > zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest
[ https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deng Ziming updated KAFKA-15289: Summary: Support KRaft mode in RequestQuotaTest (was: Use zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis in most cases) > Support KRaft mode in RequestQuotaTest > -- > > Key: KAFKA-15289 > URL: https://issues.apache.org/jira/browse/KAFKA-15289 > Project: Kafka > Issue Type: Sub-task >Reporter: Deng Ziming >Priority: Major > Labels: newbee > > In most Test cases, we are calling `zkBrokerApis`, we should ensure kraft > broker apis are also supported, so use clientApis as far as possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15287) Change NodeApiVersions.create() to contains both apis of zk and kraft broker
[ https://issues.apache.org/jira/browse/KAFKA-15287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deng Ziming resolved KAFKA-15287. - Resolution: Fixed > Change NodeApiVersions.create() to contains both apis of zk and kraft broker > - > > Key: KAFKA-15287 > URL: https://issues.apache.org/jira/browse/KAFKA-15287 > Project: Kafka > Issue Type: Sub-task >Reporter: Deng Ziming >Priority: Major > Labels: newbee > > We are using ApiKeys.zkBrokerApis() when calling NodeApiVersions.create(), > this means we only support zk broker apis. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dengziming merged pull request #14185: KAFKA-15287: Change NodeApiVersions.create() to support kraft
dengziming merged PR #14185: URL: https://github.com/apache/kafka/pull/14185 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nelson Bighetti updated KAFKA-14132: Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [[~bachmanity1] ]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ConnectorsResourceTest{color} (owner:
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nelson Bighetti updated KAFKA-14132: Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: @bachmanity ]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [[~bachmanity1] ]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) >
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
jeffkbkim commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290813753 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; +// Count of members expected to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int expectedNumMembersWithExtraPartition; +// Map of members to their remaining partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the number of partitions they still need to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +// Partitions that still need to be assigned. +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the previous owner of each partition when using rack-aware strategy. +private final Map partitionToPrevOwner; Review Comment: it is implied by the type that the previous owners are keyed by TopicIdPartition. like how we are using `potentiallyUnfilledMembers`, `unfilledMembers`, or `newAssignment` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
jeffkbkim commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290813322 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; +// Count of members expected to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int expectedNumMembersWithExtraPartition; Review Comment: `numMembersWithExtraPartition` : number of members to get the extra partition, no? there is no expected vs. actual though, we always distribute that number -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14188: KAFKA-15239: Fix ThroughputThrottler import-control
gharris1727 merged PR #14188: URL: https://github.com/apache/kafka/pull/14188 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #14188: KAFKA-15239: Fix ThroughputThrottler import-control
gharris1727 opened a new pull request, #14188: URL: https://github.com/apache/kafka/pull/14188 #14092 broke checkstyle due to the package name changing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-14598) Fix flaky ConnectRestApiTest
[ https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reopened KAFKA-14598: - Assignee: (was: Ashwin Pankaj) > Fix flaky ConnectRestApiTest > > > Key: KAFKA-14598 > URL: https://issues.apache.org/jira/browse/KAFKA-14598 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ashwin Pankaj >Priority: Minor > Labels: flaky-test > > ConnectRestApiTest sometimes fails with the message > {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not > Found\n\nHTTP ERROR 404 Not > Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not > > Found\nSERVLET:-\n\n\n\n\n', > 'http://172.31.1.75:8083/connector-plugins/')}} > This happens because ConnectDistributedService.start() by default waits till > the the line > {{Joined group at generation ..}} is visible in the logs. > In most cases this is sufficient. But in the cases where the test fails, we > see that this message appears even before Connect RestServer has finished > initialization. > {quote} - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Joined group at generation 2 with protocol version 1 > and got assignment: Assignment{error=0, > leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', > leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 > +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" > "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer) > - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is > started and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290766391 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; +// Count of members expected to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int expectedNumMembersWithExtraPartition; +// Map of members to their remaining partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the number of partitions they still need to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +// Partitions that still need to be assigned. +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the previous owner of each partition when using rack-aware strategy. +private final Map partitionToPrevOwner; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignor(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.assignmentSpec = assignmentSpec; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290765488 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; +// Count of members expected to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int expectedNumMembersWithExtraPartition; +// Map of members to their remaining partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the number of partitions they still need to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +// Partitions that still need to be assigned. +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the previous owner of each partition when using rack-aware strategy. +private final Map partitionToPrevOwner; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignor(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.assignmentSpec = assignmentSpec; + +subscriptionList = new ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + +int totalPartitionsCount = 0; +// Removes the current topic from
[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1290474356 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -140,17 +167,30 @@ public long metadataExpireMs() { } /** - * Request an update of the current cluster metadata info, return the current updateVersion before the update + * Request an update of the current cluster metadata info, permitting backoff based on the number of + * equivalent responses, which indicate that metadata responses did not make progress and may be stale. Review Comment: Could we add a comment on when the caller should set permitBackoffOnEquivalentResponses to true? Also, when should backoffOnEquivalentResponses be reset to false? ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -114,18 +127,32 @@ public synchronized Cluster fetch() { /** * Return the next time when the current cluster info can be updated (i.e., backoff time has elapsed). + * There are two calculations for backing off based on how many attempts to retrieve metadata have been made + * since the last successful response, and how many equivalent metadata responses have been received. + * The second of these allows backing off when there are errors to do with stale metadata, even though the + * metadata responses are clean. + * + * This can be used to check whether it's worth requesting an update in the knowledge that it will + * not be delayed if this method returns 0. * * @param nowMs current time in ms * @return remaining time in ms till the cluster info can be updated again */ public synchronized long timeToAllowUpdate(long nowMs) { -return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0); +// Calculate the backoff for attempts which acts when metadata responses fail +long backoffForAttempts = Math.max(this.lastRefreshMs + this.refreshBackoff.backoff(this.attempts) - nowMs, 0); + +// Calculate the backoff for equivalent responses which acts when metadata responses as not making progress Review Comment: as not making => are not making ? ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -598,22 +608,22 @@ private void insertInSequenceOrder(Deque deque, ProducerBatch bat /** * Add the leader to the ready nodes if the batch is ready * - * @param nowMs The current time * @param exhausted 'true' is the buffer pool is exhausted * @param part The partition * @param leader The leader for the partition * @param waitedTimeMs How long batch waited * @param backingOff Is backing off + * @param backoffAttempts Number of attempts for calculating backoff delay * @param full Is batch full * @param nextReadyCheckDelayMs The delay for next check * @param readyNodes The set of ready nodes (to be filled in) * @return The delay for next check */ -private long batchReady(long nowMs, boolean exhausted, TopicPartition part, Node leader, -long waitedTimeMs, boolean backingOff, boolean full, -long nextReadyCheckDelayMs, Set readyNodes) { +private long batchReady(boolean exhausted, TopicPartition part, Node leader, +long waitedTimeMs, boolean backingOff, int backoffAttempts, +boolean full, long nextReadyCheckDelayMs, Set readyNodes) { if (!readyNodes.contains(leader) && !isMuted(part)) { -long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; +long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts) : lingerMs; Review Comment: Here, backoffAttempts is already subtracted by one when calling retryBackoff.backoff. In other places, we pass in the true attempts and subtract by one when calling retryBackoff.backoff. It would be useful to make that consistent. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -550,7 +550,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { // refresh metadata before re-joining the group as long as the refresh backoff time has // passed. if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) { -this.metadata.requestUpdate(); +this.metadata.requestUpdate(true); Review Comment: Why is permitBackoffOnEquivalentResponses set to true? We refresh the metadata here not because we have discovered stale metadata. ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -140,17 +167,30 @@ public long metadataExpireMs() { } /** - * Request an update
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290764909 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; +// Count of members expected to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int expectedNumMembersWithExtraPartition; +// Map of members to their remaining partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the number of partitions they still need to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +// Partitions that still need to be assigned. +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the previous owner of each partition when using rack-aware strategy. +private final Map partitionToPrevOwner; Review Comment: I think this name makes it clear that its a map, if we just used previousOwners, its not clear whose previous owner right. We would then have to name it previousPartitionOwners, which is kinda the same as partitionToPreviousOwner? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290763624 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +private final RackInfo rackInfo; +// The minimum required quota that each member needs to meet for a balanced assignment. +// This is the same for all members. +private final int minQuota; +// Count of members expected to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int expectedNumMembersWithExtraPartition; Review Comment: yeah because the extra partitions haven't been assigned yet, I'm just taking a count of how many are expected to get the extra partition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290763071 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * Review Comment: including balance >rack>stickiness -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known
philipnee commented on PR #14179: URL: https://github.com/apache/kafka/pull/14179#issuecomment-1674034799 @junrao - This minor PR fixes a small bug in the commit request manager. Would you be able to review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known
philipnee commented on PR #14179: URL: https://github.com/apache/kafka/pull/14179#issuecomment-1674034179 Here are the failing flaky tests. ``` Build / JDK 17 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 1m 48s Build / JDK 17 and Scala 2.13 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest 16s Build / JDK 17 and Scala 2.13 / shouldThrowStreamsExceptionOnStartupIfExceptionOccurred – org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest <1s Build / JDK 8 and Scala 2.12 / testReplication() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest 1m 45s Build / JDK 8 and Scala 2.12 / testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest 1m 17s Build / JDK 8 and Scala 2.12 / testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest 1m 16s Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 13s Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 14s Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 1m 55s Build / JDK 20 and Scala 2.13 / testSimultaneousUpwardAndDownwardDelegating – org.apache.kafka.connect.runtime.isolation.SynchronizationTest 13s Build / JDK 20 and Scala 2.13 / testRackAwareRangeAssignor() – integration.kafka.server.FetchFromFollowerIntegrationTest 40s Build / JDK 20 and Scala 2.13 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest 19s ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14187: KAFKA-13197: fix GlobalKTable join/left-join semantics documentation.
mjsax commented on code in PR #14187: URL: https://github.com/apache/kafka/pull/14187#discussion_r1290758646 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -2805,7 +2805,7 @@ KStream leftJoin(final KTable table, * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as the key of this {@code KStream}. - * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join + * If a {@code KStream} input value is {@code null} the record will not be included in the join * operation and thus no output record will be added to the resulting {@code KStream}. * If {@code keyValueMapper} returns {@code null} implying no match exists, no output record will be added to the * resulting {@code KStream}. Review Comment: Should we merge both sentences: ``` If a {@code KStream} input record key or value is {@code null} the record will not be included in the join * If a {@code KStream} input value is {@code null} the record will not be included in the join * operation and thus no output record will be added to the resulting {@code KStream}. * If {@code keyValueMapper} returns {@code null} implying no match exists, no output record will be added to the * resulting {@code KStream}. ``` Into: ``` * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. ``` Same below? (Note: for left-join the JavaDocs about `null` returned from `keyValueMapper` seem to be wrong, too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation
[ https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13197: Component/s: documentation streams > KStream-GlobalKTable join semantics don't match documentation > - > > Key: KAFKA-13197 > URL: https://issues.apache.org/jira/browse/KAFKA-13197 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 2.7.0 >Reporter: Tommy Becker >Assignee: Florin Akermann >Priority: Major > > As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was > changed. It appears the change was intended to merely relax a requirement but > it actually broke backwards compatibility. Although it does allow {{null}} > keys and values in the KStream to be joined, it now excludes {{null}} results > of the {{KeyValueMapper}}. We have an application which can return {{null}} > from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on > these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still > explicitly says this is done: > {quote}If a KStream input record key or value is null the record will not be > included in the join operation and thus no output record will be added to the > resulting KStream. > If keyValueMapper returns null implying no match exists, a null value will > be provided to ValueJoiner. > {quote} > Both these statements are incorrect. > I think the new behavior is worse than the previous/documented behavior. It > feels more reasonable to have a non-null stream record map to a null join key > (our use-case is event-enhancement where the incoming record doesn't have the > join field), than the reverse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service
gharris1727 commented on PR #14092: URL: https://github.com/apache/kafka/pull/14092#issuecomment-1673942664 Thank you @fvaleri for diagnosing this test regression and promptly fixing it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15239. - Fix Version/s: 3.6.0 Reviewer: Greg Harris Resolution: Fixed > producerPerformance system test for old client failed after v3.5.0 > -- > > Key: KAFKA-15239 > URL: https://issues.apache.org/jira/browse/KAFKA-15239 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 3.6.0 >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Fix For: 3.6.0 > > > While running producer performance tool in system test for old client (ex: > quota_test), we will try to run with the dev-branch's jar file, to make sure > it is backward compatible, as described > [here|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/producer_performance.py#L86-L88],. > {code:java} > # In order to ensure more consistent configuration between versions, always > use the ProducerPerformance tool from the development branch {code} > > But in KAFKA-14525, we're moving tools from core module to a separate tool > module, we're actually breaking the backward compatibility. We should fix the > system test. Also maybe we should also mention anywhere about this backward > compatibility issue? > Note: > This is the command run in system test. Suppose it's testing old client 3.4.0 > (file put under `~/Downloads/kafka_2.13-3.4.0` in my env), and running under > the latest trunk env. > {code:java} > > for file in ./tools/build/libs/kafka-tools*.jar; do > > CLASSPATH=$CLASSPATH:$file; done; for file in > > ./tools/build/dependant-libs*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; > > export CLASSPATH; export > > KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:config/tools-log4j.properties"; > > KAFKA_OPTS= KAFKA_HEAP_OPTS="-XX:+HeapDumpOnOutOfMemoryError" > > ~/Downloads/kafka_2.13-3.4.0/bin/kafka-run-class.sh > > org.apache.kafka.tools.ProducerPerformance --topic test_topic --num-records > > 5 --record-size 3000 --throughput -1 --producer-props > > bootstrap.servers=localhost:9092 client.id=overridden_id > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/kafka/common/utils/ThroughputThrottler > at > org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101) > at > org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.utils.ThroughputThrottler > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) > ... 2 more > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 merged pull request #14092: KAFKA-15239: Fix system tests using producer performance service
gharris1727 merged PR #14092: URL: https://github.com/apache/kafka/pull/14092 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15202) MM2 OffsetSyncStore clears too many syncs when sync spacing is variable
[ https://issues.apache.org/jira/browse/KAFKA-15202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15202: Affects Version/s: 3.5.1 (was: 3.3.3) > MM2 OffsetSyncStore clears too many syncs when sync spacing is variable > --- > > Key: KAFKA-15202 > URL: https://issues.apache.org/jira/browse/KAFKA-15202 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.5.0, 3.4.1, 3.5.1 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.3.3, 3.4.2, 3.5.2 > > > The spacing between OffsetSyncs can vary significantly, due to conditions in > the upstream topic and in the replication rate of the MirrorSourceTask. > The OffsetSyncStore attempts to keep a maximal number of distinct syncs > present, and for regularly spaced syncs it does not allow an incoming sync to > expire more than one other unique sync. There are tests to enforce this > property. > For variable spaced syncs, there is no such guarantee, because multiple > fine-grained syncs may need to be expired at the same time. However, instead > of only those fine-grained syncs being expired, the store may also expire > coarser-grained syncs. This causes a large decrease in the number of unique > syncs. > This is an extremely simple example: Syncs: 0 (start), 1, 2, 4. > The result: > {noformat} > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=1, > downstreamOffset=1} applied, new state is [1:1,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194) > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=2, > downstreamOffset=2} applied, new state is [2:2,1:1,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194) > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=4, > downstreamOffset=4} applied, new state is [4:4,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194){noformat} > Instead of being expired, the 2:2 sync should still be present in the final > state, allowing the store to maintain 3 unique syncs. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15202) MM2 OffsetSyncStore clears too many syncs when sync spacing is variable
[ https://issues.apache.org/jira/browse/KAFKA-15202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15202: Fix Version/s: 3.3.3 3.4.2 3.5.2 > MM2 OffsetSyncStore clears too many syncs when sync spacing is variable > --- > > Key: KAFKA-15202 > URL: https://issues.apache.org/jira/browse/KAFKA-15202 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.3.3, 3.4.2, 3.5.2 > > > The spacing between OffsetSyncs can vary significantly, due to conditions in > the upstream topic and in the replication rate of the MirrorSourceTask. > The OffsetSyncStore attempts to keep a maximal number of distinct syncs > present, and for regularly spaced syncs it does not allow an incoming sync to > expire more than one other unique sync. There are tests to enforce this > property. > For variable spaced syncs, there is no such guarantee, because multiple > fine-grained syncs may need to be expired at the same time. However, instead > of only those fine-grained syncs being expired, the store may also expire > coarser-grained syncs. This causes a large decrease in the number of unique > syncs. > This is an extremely simple example: Syncs: 0 (start), 1, 2, 4. > The result: > {noformat} > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=1, > downstreamOffset=1} applied, new state is [1:1,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194) > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=2, > downstreamOffset=2} applied, new state is [2:2,1:1,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194) > TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=4, > downstreamOffset=4} applied, new state is [4:4,0:0] > (org.apache.kafka.connect.mirror.OffsetSyncStore:194){noformat} > Instead of being expired, the 2:2 sync should still be present in the final > state, allowing the store to maintain 3 unique syncs. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13187) Replace EasyMock and PowerMock with Mockito for DistributedHerderTest
[ https://issues.apache.org/jira/browse/KAFKA-13187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-13187. --- Fix Version/s: 3.6.0 Resolution: Done > Replace EasyMock and PowerMock with Mockito for DistributedHerderTest > - > > Key: KAFKA-13187 > URL: https://issues.apache.org/jira/browse/KAFKA-13187 > Project: Kafka > Issue Type: Sub-task >Reporter: YI-CHEN WANG >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest
C0urante merged PR #14102: URL: https://github.com/apache/kafka/pull/14102 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest
C0urante commented on PR #14102: URL: https://github.com/apache/kafka/pull/14102#issuecomment-1673896029 I should note that because this test makes heavy use of Mockito's strict stubbing feature, I've opened https://github.com/apache/kafka/pull/14186 to enable reporting of unused stubbings in our CI builds. I've verified locally that there are no unused stubbings in this PR (`./gradlew :connect:runtime:test --tests DistributedHerderTest` does the trick), and any local build that uses the `test` task instead of `unitTest` should also report unused stubbings, so I don't believe it's necessary to block on https://github.com/apache/kafka/pull/14186 before merging this. Test failures in CI also appear unrelated. Merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest
C0urante commented on code in PR #14102: URL: https://github.com/apache/kafka/pull/14102#discussion_r1290658873 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -3872,7 +2952,7 @@ public void testVerifyTaskGeneration() { assertThrows(ConnectException.class, () -> herder.verifyTaskGenerationAndOwnership(unassignedTask, 1, verifyCallback)); assertThrows(ConnectException.class, () -> herder.verifyTaskGenerationAndOwnership(unassignedTask, 2, verifyCallback)); -PowerMock.verifyAll(); +verify(verifyCallback, times(3)).onCompletion(isNull(), isNull()); Review Comment: Whoops! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest
C0urante commented on code in PR #14102: URL: https://github.com/apache/kafka/pull/14102#discussion_r1290659476 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -3929,543 +3008,636 @@ public void testPollDurationOnSlowConnectorOperations() { final int rebalanceDelayMs = 2; final long operationDelayMs = 1; final long maxPollWaitMs = rebalanceDelayMs - operationDelayMs; -EasyMock.expect(member.memberId()).andStubReturn("member"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion); +when(member.memberId()).thenReturn("member"); + when(member.currentProtocolVersion()).thenReturn(connectProtocolVersion); // Assign the connector to this worker, and have it start expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); expectConfigRefreshAndSnapshot(SNAPSHOT); -Capture> onFirstStart = newCapture(); -worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), -EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onFirstStart)); -PowerMock.expectLastCall().andAnswer(() -> { +ArgumentCaptor> onFirstStart = ArgumentCaptor.forClass(Callback.class); +doAnswer(invocation -> { time.sleep(operationDelayMs); onFirstStart.getValue().onCompletion(null, TargetState.STARTED); return true; -}); -member.wakeup(); -PowerMock.expectLastCall(); -expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> TASK_CONFIGS); -// We should poll for less than the delay - time to start the connector, meaning that a long connector start -// does not delay the poll timeout -member.poll(leq(maxPollWaitMs)); -PowerMock.expectLastCall(); +}).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG), any(), eq(herder), eq(TargetState.STARTED), onFirstStart.capture()); +expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); + +herder.tick(); // Rebalance again due to config update -member.wakeup(); -PowerMock.expectLastCall(); expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); - EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); - -worker.stopAndAwaitConnector(CONN1); -PowerMock.expectLastCall(); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion); -Capture> onSecondStart = newCapture(); -worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), -EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onSecondStart)); -PowerMock.expectLastCall().andAnswer(() -> { + when(configBackingStore.snapshot()).thenReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); +doNothing().when(worker).stopAndAwaitConnector(CONN1); + +ArgumentCaptor> onSecondStart = ArgumentCaptor.forClass(Callback.class); +doAnswer(invocation -> { time.sleep(operationDelayMs); onSecondStart.getValue().onCompletion(null, TargetState.STARTED); return true; -}); -member.wakeup(); -PowerMock.expectLastCall(); -member.poll(leq(maxPollWaitMs)); -PowerMock.expectLastCall(); +}).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG_UPDATED), any(), eq(herder), eq(TargetState.STARTED), onSecondStart.capture()); +expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, invocation -> TASK_CONFIGS); + +configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config +herder.tick(); // Third tick should resolve all outstanding requests expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); // which includes querying the connector task configs after the update -expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, () -> { +expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, invocation -> { time.sleep(operationDelayMs); return TASK_CONFIGS; }); -member.poll(leq(maxPollWaitMs)); -PowerMock.expectLastCall(); - -PowerMock.replayAll(); -herder.tick(); -configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config -herder.tick(); herder.tick(); -
[GitHub] [kafka] rittikaadhikari commented on pull request #13503: MINOR: Refactor TierStateMachine related tests into a separate test file
rittikaadhikari commented on PR #13503: URL: https://github.com/apache/kafka/pull/13503#issuecomment-1673867152 @junrao Thanks for the review! I have addressed the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
gharris1727 merged PR #14156: URL: https://github.com/apache/kafka/pull/14156 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
gharris1727 commented on PR #14156: URL: https://github.com/apache/kafka/pull/14156#issuecomment-1673861990 The latest CI run has some unrelated flakiness, and tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility
C0urante commented on code in PR #14082: URL: https://github.com/apache/kafka/pull/14082#discussion_r1290641476 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java: ## @@ -43,6 +47,11 @@ public void configure(Map props) { log.info("Using custom remote topic separator: '{}'", separator); separatorPattern = Pattern.compile(Pattern.quote(separator)); } + +if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) { +log.info("Disable the usage of topic separator for internal topics"); Review Comment: I think the first part of this still needs to be addressed? Specifically, we should only emit the log statement if `isInternalTopicSeparatorEnabled` is `false`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
gharris1727 commented on PR #14156: URL: https://github.com/apache/kafka/pull/14156#issuecomment-1673859231 I saw one of the CI builds with testOffsetTranslationBehindReplicationFlow failing, so I think there is still another source of flakiness somewhere. I'll close the fix JIRA but leave the flaky JIRA open. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
jeffkbkim commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1290585581 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * Review Comment: I would include the priorities when considering uniform vs. sticky vs. rack-aware ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked previous owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignor extends UniformAssignor { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignor.class); +// List of topics subscribed to by all members. +private final List subscriptionList; +private final
[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
C0urante commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1290629836 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. + +Verifying Plugin Compatibility + +To verify if all of your plugins are compatible, first ensure that you are using version 3.6 or later of the Connect runtime. You can then perform one of the following checks: + + +Start your worker with the default HYBRID_WARNstrategy, and WARN logs enabled for the org.apache.kafka.connect package. At least one WARN log message mentioning the plugin.discovery configuration should be printed. This log message will explicitly say that all plugins are compatible, or list the incompatible plugins. +Start your worker in a test environment with HYBRID_FAIL. If all plugins are compatible, startup will succeed. If at least one plugin is not compatible the worker will fail to start up, and all incompatible plugins will be listed in the exception. + + +If the verification step succeeds, then your current set of installed plugins are compatible, and it should be safe to change the plugin.discovery configuration to SERVICE_LOAD. If you change the set of already-installed plugins, they may no longer be compatible, and you should repeat the above verification. If the verification fails, you must address the incompatible plugins before using the SERVICE_LOAD strategy. + +Operators: Artifact Migration + +As an operator of Connect, if you discover incompatible plugins, there are multiple ways to try to resolve the incompatibility. They are listed below from most to least preferable. + + +Upgrade your incompatible plugins to the latest release version from your plugin provider. +Contact your plugin provider and request that they migrate the plugin to be compatible, following the source migration instructions, and then upgrade to the migrated version. +Migrate the plugin artifacts yourself using the included migration script. + + +The migration script is located in bin/connect-plugin-path.sh and bin\windows\connect-plugin-path.bat of your Kafka installation. The script can migrate incompatible plugin artifacts already installed on your Connect worker's plugin.path by adding or modifying JAR or resource files. This is not suitable for environments using code-signing, as this may change the artifacts such that they will fail signature verification. View the built-in help with --help. + +To perform a migration, first use the list subcommand to get an overview of the plugins available to the script. You must tell the script where to find plugins, which can be done with the repeatable --worker-config, --plugin-path, and --plugin-location arguments. The script will only migrate plugins present in the paths specified, so if you add plugins to your worker's classpath, then you will need to specify those plugins via one or more --plugin-location arguments. + +Once you see that all incompatible plugins are included in the listing, you can proceed to dry-run the migration with sync-manifests --dry-run. This will perform all parts of the migration, except for writing the results of the migration to disk. Note that the sync-manifests command requires all specified paths to be writable, and may alter the contents of the directories. Make a backup of the specified paths, or copy them to a writable directory. Review Comment: LGTM, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
C0urante commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1290627899 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. + +Verifying Plugin Compatibility + +To verify if all of your plugins are compatible, first ensure that you are using version 3.6 or later of the Connect runtime. You can then perform one of the following checks: + + +Start your worker with the default HYBRID_WARNstrategy, and WARN logs enabled for the org.apache.kafka.connect package. At least one WARN log message mentioning the plugin.discovery configuration should be printed. This log message will explicitly say that all plugins are compatible, or list the incompatible plugins. +Start your worker in a test environment with HYBRID_FAIL. If all plugins are compatible, startup will succeed. If at least one plugin is not compatible the worker will fail to start up, and all incompatible plugins will be listed in the exception. + + +If the verification step succeeds, then your current set of installed plugins are compatible, and it should be safe to change the plugin.discovery configuration to SERVICE_LOAD. If you change the set of already-installed plugins, they may no longer be compatible, and you should repeat the above verification. If the verification fails, you must address the incompatible plugins before using the SERVICE_LOAD strategy. + +Operators: Artifact Migration + +As an operator of Connect, if you discover incompatible plugins, there are multiple ways to try to resolve the incompatibility. They are listed below from most to least preferable. + + +Upgrade your incompatible plugins to the latest release version from your plugin provider. +Contact your plugin provider and request that they migrate the plugin to be compatible, following the source migration instructions, and then upgrade to the migrated version. +Migrate the plugin artifacts yourself using the included migration script. + + +The migration script is located in bin/connect-plugin-path.sh and bin\windows\connect-plugin-path.bat of your Kafka installation. The script can migrate incompatible plugin artifacts already installed on your Connect worker's plugin.path by adding or modifying JAR or resource files. This is not suitable for environments using code-signing, as this may change the artifacts such that they will fail signature verification. View the built-in help with --help. + +To perform a migration, first use the list subcommand to get an overview of the plugins available to the script. You must tell the script where to find plugins, which can be done with the repeatable --worker-config, --plugin-path, and --plugin-location arguments. The script will only migrate plugins present in the paths specified, so if you add plugins to your worker's classpath, then you will need to specify those plugins via one or more --plugin-location arguments. + +Once you see that all incompatible plugins are included in the listing, you can proceed to dry-run the migration with sync-manifests --dry-run. This will perform all parts of the migration, except for writing the results of the migration to disk. Note that the sync-manifests command requires all specified paths to be writable, and may alter the contents of the directories. Make a backup of the specified paths, or copy them to a writable directory. + +Ensure that you have a backup and the dry-run succeeds before removing the --dry-run flag and actually running the migration. If the migration fails without the --dry-run flag, then the partially migrated artifacts should be discarded. The migration is idempotent, so running it multiple times and on already-migrated plugins is safe. After the migration is completed, you should
[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
C0urante commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1290624547 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. + +Verifying Plugin Compatibility + +To verify if all of your plugins are compatible, first ensure that you are using version 3.6 or later of the Connect runtime. You can then perform one of the following checks: + + +Start your worker with the default HYBRID_WARNstrategy, and WARN logs enabled for the org.apache.kafka.connect package. At least one WARN log message mentioning the plugin.discovery configuration should be printed. This log message will explicitly say that all plugins are compatible, or list the incompatible plugins. +Start your worker in a test environment with HYBRID_FAIL. If all plugins are compatible, startup will succeed. If at least one plugin is not compatible the worker will fail to start up, and all incompatible plugins will be listed in the exception. + + +If the verification step succeeds, then your current set of installed plugins are compatible, and it should be safe to change the plugin.discovery configuration to SERVICE_LOAD. If you change the set of already-installed plugins, they may no longer be compatible, and you should repeat the above verification. If the verification fails, you must address the incompatible plugins before using the SERVICE_LOAD strategy. + +Operators: Artifact Migration + +As an operator of Connect, if you discover incompatible plugins, there are multiple ways to try to resolve the incompatibility. They are listed below from most to least preferable. + + +Upgrade your incompatible plugins to the latest release version from your plugin provider. +Contact your plugin provider and request that they migrate the plugin to be compatible, following the source migration instructions, and then upgrade to the migrated version. +Migrate the plugin artifacts yourself using the included migration script. + + +The migration script is located in bin/connect-plugin-path.sh and bin\windows\connect-plugin-path.bat of your Kafka installation. The script can migrate incompatible plugin artifacts already installed on your Connect worker's plugin.path by adding or modifying JAR or resource files. This is not suitable for environments using code-signing, as this may change the artifacts such that they will fail signature verification. View the built-in help with --help. + +To perform a migration, first use the list subcommand to get an overview of the plugins available to the script. You must tell the script where to find plugins, which can be done with the repeatable --worker-config, --plugin-path, and --plugin-location arguments. The script will only migrate plugins present in the paths specified, so if you add plugins to your worker's classpath, then you will need to specify those plugins via one or more --plugin-location arguments. Review Comment: Thanks, wish we didn't have this discrepancy in behavior but I'm convinced that you've thoroughly investigated and there aren't any maintainable alternatives ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
gharris1727 commented on code in PR #14064: URL: https://github.com/apache/kafka/pull/14064#discussion_r1290624348 ## tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java: ## @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginScanResult; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; +import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; +import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConnectPluginPath { + +private static final String MANIFEST_PREFIX = "META-INF/services/"; +public static final Object[] LIST_TABLE_COLUMNS = { +"pluginName", +"firstAlias", +"secondAlias", +"pluginVersion", +"pluginType", +"isLoadable", +"hasManifest", +"pluginLocation" // last because it is least important and most repetitive +}; +public static final String NO_ALIAS = "N/A"; + +public static void main(String[] args) { +Exit.exit(mainNoExit(args, System.out, System.err)); +} + +public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { +ArgumentParser parser = parser(); +try { +Namespace namespace = parser.parseArgs(args); +Config config = parseConfig(parser, namespace, out); +runCommand(config); +return 0; +} catch (ArgumentParserException e) { +parser.handleError(e); +return 1; +} catch (TerseException e) { +err.println(e.getMessage()); +return 2; +} catch (Throwable e) { +err.println(e.getMessage()); +err.println(Utils.stackTrace(e)); +return 3; +} +} + +private static ArgumentParser parser() { +ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-plugin-path") +.defaultHelp(true) +.description("Manage plugins on the Connect plugin.path"); + +ArgumentParser listCommand = parser.addSubparsers() +.description("List information about plugins contained within the specified plugin locations") +.dest("subcommand") +.addParser("list"); + +ArgumentParser[] subparsers = new ArgumentParser[] { +listCommand, +}; + +for
[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
gharris1727 commented on PR #14064: URL: https://github.com/apache/kafka/pull/14064#issuecomment-1673833354 > We may choose to report broken plugin locations (i.e., any that has one or more unloadable plugins), but IMO it's better to leave this info out entirely. We have room to maneuver in the future with this tool if we want (the public interface boundaries set by the KIP are very permissive), and I'd rather we err on the side of simplicity for now. Total/loadable/compatible plugins seems like plenty. I removed the location summaries and overall summaries. Users will need to inspect the table to figure out if everything is compatible, or run a worker verification, which is the more reliable way to find incompatibilities anyway. > I'm also worried about our alias collision reporting. Right now every invocation of the command so far has produced a ton of warnings about SMT aliases collisiding. It'd be nice if we could use ReplaceField$Key as an alias instead of just Key, but until/unless that KIP gets approved, we're stuck with a fairly large number of collisions with the aliases for our OOTB transforms. I think this is the right call, since we already changed it so that the aliases in each row of the table doesn't depend on other rows. The alias collision logic was a red-herring because I accidentally used the collision logic in the original implementation, when it really isn't relevant to the tool at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] florin-akermann opened a new pull request, #14187: KAFKA-13197: fix GlobalKTable join/left-join semantics documentation.
florin-akermann opened a new pull request, #14187: URL: https://github.com/apache/kafka/pull/14187 The documentation for join/left-join Kstream-GlobalKtable is out dated: The left hand side key may be null. Note that the second statement mentioned in the Jira https://issues.apache.org/jira/browse/KAFKA-13197 is no longer present. Note: The constraint that KeyValueMapper cannot return null for left joins or else the record gets dropped will be relaxed as part of https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
C0urante commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1290620328 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. Review Comment: LGTM, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290589930 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: I pushed a corresponding update, but I am not super happy with it... I believe the original idea to get the list of values from `CONFIG` is safer. Curious to hear what others think. In the end I am fine both ways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
kirktrue commented on PR #14156: URL: https://github.com/apache/kafka/pull/14156#issuecomment-1673817790 @gharris1727 Thanks for your work on this! Hopefully it will help KAFKA-15197 too! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13908: KAFKA-15052 Fix the flaky testBalancePartitionLeaders - part II
kirktrue commented on PR #13908: URL: https://github.com/apache/kafka/pull/13908#issuecomment-1673815636 @divijvaidya Can you review this with an eye to merging it? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service
gharris1727 commented on PR #14092: URL: https://github.com/apache/kafka/pull/14092#issuecomment-1673813076 System test failures appear to be due to reasons other than classloading/dependency issues. The producer performance tests appear to work normally. ``` tests/kafkatest/tests/core/zookeeper_migration_test.py::TestMigration.test_pre_migration_mode_3_4@{"metadata_quorum":"ISOLATED_KRAFT"} tests/kafkatest/tests/core/zookeeper_migration_test.py::TestMigration.test_upgrade_after_3_4_migration tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_PLAINTEXT"} tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_SSL"} tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"ISOLATED_KRAFT"} tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"ZK"} tests/kafkatest/tests/streams/streams_smoke_test.py::StreamsSmokeTest.test_streams@{"processing_guarantee":"at_least_once","crash":false,"metadata_quorum":"ISOLATED_KRAFT"} tests/kafkatest/tests/core/downgrade_test.py::TestDowngrade.test_upgrade_and_downgrade@{"version":"2.7.1","compression_types":["none"],"static_membership":false} tests/kafkatest/tests/core/downgrade_test.py::TestDowngrade.test_upgrade_and_downgrade@{"version":"2.7.1","compression_types":["none"],"static_membership":true} tests/kafkatest/tests/core/downgrade_test.py::TestDowngrade.test_upgrade_and_downgrade@{"version":"2.7.1","compression_types":["zstd"],"security_protocol":"SASL_SSL"} tests/kafkatest/tests/streams/streams_broker_bounce_test.py::StreamsBrokerBounceTest.test_broker_type_bounce@{"failure_mode":"hard_shutdown","broker_type":"controller","num_threads":1,"sleep_time_secs":120,"metadata_quorum":"ZK"} tests/kafkatest/tests/streams/streams_broker_bounce_test.py::StreamsBrokerBounceTest.test_broker_type_bounce@{"failure_mode":"hard_shutdown","broker_type":"leader","num_threads":1,"sleep_time_secs":120,"metadata_quorum":"ZK"} tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces@{"from_version":"3.1.2","to_version":"3.6.0-SNAPSHOT"} tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces@{"from_version":"3.2.3","to_version":"3.6.0-SNAPSHOT"} tests/kafkatest/tests/tools/replica_verification_test.py::ReplicaVerificationToolTest.test_replica_lags@{"metadata_quorum":"ISOLATED_KRAFT"} tests/kafkatest/tests/tools/replica_verification_test.py::ReplicaVerificationToolTest.test_replica_lags@{"metadata_quorum":"ZK"} tests/kafkatest/tests/core/network_degrade_test.py::NetworkDegradeTest.test_rate@{"task_name":"rate-1000-latency-50","device_name":"eth0","latency_ms":50,"rate_limit_kbit":100} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
C0urante commented on code in PR #14064: URL: https://github.com/apache/kafka/pull/14064#discussion_r1290570863 ## tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java: ## @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginScanResult; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; +import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; +import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConnectPluginPath { + +private static final String MANIFEST_PREFIX = "META-INF/services/"; +public static final Object[] LIST_TABLE_COLUMNS = { +"pluginName", +"firstAlias", +"secondAlias", +"pluginVersion", +"pluginType", +"isLoadable", +"hasManifest", +"pluginLocation" // last because it is least important and most repetitive +}; +public static final String NO_ALIAS = "N/A"; + +public static void main(String[] args) { +Exit.exit(mainNoExit(args, System.out, System.err)); +} + +public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { +ArgumentParser parser = parser(); +try { +Namespace namespace = parser.parseArgs(args); +Config config = parseConfig(parser, namespace, out); +runCommand(config); +return 0; +} catch (ArgumentParserException e) { +parser.handleError(e); +return 1; +} catch (TerseException e) { +err.println(e.getMessage()); +return 2; +} catch (Throwable e) { +err.println(e.getMessage()); +err.println(Utils.stackTrace(e)); +return 3; +} +} + +private static ArgumentParser parser() { +ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-plugin-path") +.defaultHelp(true) +.description("Manage plugins on the Connect plugin.path"); + +ArgumentParser listCommand = parser.addSubparsers() +.description("List information about plugins contained within the specified plugin locations") +.dest("subcommand") +.addParser("list"); + +ArgumentParser[] subparsers = new ArgumentParser[] { +listCommand, +}; + +for (ArgumentParser
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: This part does not work: `public static final String UPGRADE_FROM_34 = UpgradeFromValues.UPGRADE_FROM_34.toString();` because we use ``` switch ((String) upgradeFrom) { case StreamsConfig.UPGRADE_FROM_0100: ... } ``` And the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a const ``` error: constant string expression required case StreamsConfig.UPGRADE_FROM_0100: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: This part does not work: `public static final String UPGRADE_FROM_34 = UpgradeFromValues.UPGRADE_FROM_34.toString();` We use ``` switch ((String) upgradeFrom) { case StreamsConfig.UPGRADE_FROM_0100: ... } ``` and the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a const ``` error: constant string expression required case StreamsConfig.UPGRADE_FROM_0100: ``` So not sure if we gain much adding the enum? -- We would still need to remember update the enum, and thus have a "split brain" problem -- getting the List out of the `CONFIG` object seems to be safer. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: This part does not work: `public static final String UPGRADE_FROM_34 = UpgradeFromValues.UPGRADE_FROM_34.toString();` because we use ``` switch ((String) upgradeFrom) { case StreamsConfig.UPGRADE_FROM_0100: ... } ``` And the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a const ``` error: constant string expression required case StreamsConfig.UPGRADE_FROM_0100: ``` So not sure if we gain much adding the enum? -- We would still need to remember update the enum, and thus have a "split brain" problem -- getting the List out of the `CONFIG` object seems to be safer. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: This part does not work: `public static final String UPGRADE_FROM_34 = UpgradeFromValues.UPGRADE_FROM_34.toString();` because we use ``` switch ((String) upgradeFrom) { case StreamsConfig.UPGRADE_FROM_0100: ... } ``` And the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a const ``` error: constant string expression required case StreamsConfig.UPGRADE_FROM_0100: ``` So not sure if we gain much adding the enum? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290555860 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: This does not really help, because in the end we need a `List` of config values -- the source of truth what config value we support is inside the `CONFIG` object, so we need to get it from there IMHO. If we don't want to tap into the `CONFIG` object, we can maintain a list ourselves (but we don't need an `enum` for it), but I think it defeats the purpose of the test, as we again have a "split brain" problem for which we need to keep the `CONFIG` and our manually maintained list in sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290555860 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: This does not really help, because in the end we need a `List` of config values -- the source of truth what config value we support is inside the `CONFIG` object, so we need to get it from there IMHO. If we don't want to tap into the `CONFIG` object, we can maintain a list ourselves, but I think it defeats the purpose of the test, as we again have a "split brain" problem for which we need to keep the `CONFIG` and our manually maintained list in sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5
mjsax commented on code in PR #14103: URL: https://github.com/apache/kafka/pull/14103#discussion_r1290555860 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -947,9 +947,15 @@ public String toString() { } public static class ValidString implements Validator { -final List validStrings; +// visible for testing Review Comment: This does not really help, because in the end we need a `List` of config values -- the source of truth what config value we support is inside the `CONFIG` object, so we need to get it from there IMHO. If we don't want to tap into the `CONFIG` object, we can maintain a list ourselves (but we don't need an `enum` for it), but I think it defeats the purpose of the test, as we again have a "split brain" problem for which we need to keep the `CONFIG` and our manually maintained list in sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15333) Flaky build failure throwing Connect Exception: Could not connect to server....
Philip Nee created KAFKA-15333: -- Summary: Flaky build failure throwing Connect Exception: Could not connect to server Key: KAFKA-15333 URL: https://issues.apache.org/jira/browse/KAFKA-15333 Project: Kafka Issue Type: Test Components: connect, unit tests Reporter: Philip Nee We frequently observe flaky build failure with the following message. The is from the most recent PR post 3.5.0: {code:java} > Task :generator:testClasses UP-TO-DATE Unexpected exception thrown. org.gradle.internal.remote.internal.MessageIOException: Could not read message from '/127.0.0.1:38354'. at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:94) at org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:47) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalArgumentException at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:72) at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52) at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:81) ... 6 more > Task :streams:upgrade-system-tests-26:unitTest org.gradle.internal.remote.internal.ConnectException: Could not connect to server [3156f144-9a89-4c47-91ad-88a8378ec726 port:37889, addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67) at org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:122) at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81) at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54) ... 5 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #14138: Fix a race when query isUnderMinIsr
jolshan commented on code in PR #14138: URL: https://github.com/apache/kafka/pull/14138#discussion_r1290546194 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -798,6 +803,12 @@ class Partition(val topicPartition: TopicPartition, // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionState.controllerEpoch + val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch Review Comment: Can we add a bit more detail in the description about why we moved this to be before updateAssignmentAndIsr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build
C0urante commented on PR #14186: URL: https://github.com/apache/kafka/pull/14186#issuecomment-1673671924 I've opened this as a draft request to demonstrate that unused stubbings will be successfully reported with the proposed change to the Jenkinsfile. Once the build fails successfully (), I'll revert the commit that induces failure and mark this ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290483692 ## docs/streams/developer-guide/config-streams.html: ## @@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization. + +rack.aware.assignment.non_overlap_cost + + + + This configuration sets the cost of moving a task from the original assignment computed either by StickyTaskAssignor or + HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost, + they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.traffic_cost, + the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting + rack.aware.assignment.non_overlap_cost to 10 and rack.aware.assignment.traffic_cost to 1 is more likely to maintain existing assignment than setting + rack.aware.assignment.non_overlap_cost to 100 and rack.aware.assignment.traffic_cost to 50. + + + The default value is null which means default non_overlap_cost in different assignors will be used. In StickyTaskAssignor, it has a higher default value than rack.aware.assignment.traffic_cost which means + maintaining stickiness is preferred in StickyTaskAssignor. In HighAvailabilityTaskAssignor, it has a lower default value than rack.aware.assignment.traffic_cost Review Comment: > it has a lower default value As above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #14050: KAFKA-15220: Do not returned fenced brokers from getAliveBrokerNode
cmccabe merged PR #14050: URL: https://github.com/apache/kafka/pull/14050 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290482358 ## docs/streams/developer-guide/config-streams.html: ## @@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization. + +rack.aware.assignment.non_overlap_cost + + + + This configuration sets the cost of moving a task from the original assignment computed either by StickyTaskAssignor or + HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost, + they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.traffic_cost, + the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting + rack.aware.assignment.non_overlap_cost to 10 and rack.aware.assignment.traffic_cost to 1 is more likely to maintain existing assignment than setting + rack.aware.assignment.non_overlap_cost to 100 and rack.aware.assignment.traffic_cost to 50. + + + The default value is null which means default non_overlap_cost in different assignors will be used. In StickyTaskAssignor, it has a higher default value than rack.aware.assignment.traffic_cost which means Review Comment: put `non_overlap_cost` into code markup ## docs/streams/architecture.html: ## @@ -167,6 +167,14 @@ Kafka Streams Developer Guide section. + +There is also a client side config client.rack which can set the rack for a Kafka Consumer. If broker side also have rack set via broker.rack. Then rack aware task Review Comment: ```suggestion There is also a client config client.rack which can set the rack for a Kafka consumer. If brokers also have their rack set via broker.rack, then rack aware task ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build
C0urante opened a new pull request, #14186: URL: https://github.com/apache/kafka/pull/14186 [Jira](https://issues.apache.org/jira/browse/KAFKA-14682) Our Jenkinsfile is currently configured to do testing with the `unitTest` and `integrationTest` tasks, instead of the `test` task. These tasks use JUnit test category filters to distinguish between integration and non-integration tests. Because of https://github.com/mockito/mockito/issues/3077, if a JUnit test category filter is used, Mockito does not report unused stubbings. As a result, our CI builds currently don't catch unused stubbings. Although an upstream fix has been implemented with https://github.com/mockito/mockito/pull/3078, a release has not yet been published with that fix. Even when a new release is published, it will take place on the 5.x.y line, which requires JDK 11+ (see the [5.0.0 release notes](https://github.com/mockito/mockito/releases/tag/v5.0.0)). As a workaround, we can switch to using the `test` task instead of the `unitTest` and `integrationTest` tasks. If there are benefits to the two-task approach, we can revert to that approach once we begin work on Kafka 4.0 (which drops support for Java 8) and upgrade to a version of Mockito with the fix. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15332) Eligible Leader Replicas
Calvin Liu created KAFKA-15332: -- Summary: Eligible Leader Replicas Key: KAFKA-15332 URL: https://issues.apache.org/jira/browse/KAFKA-15332 Project: Kafka Issue Type: New Feature Reporter: Calvin Liu Assignee: Calvin Liu A root ticket for the KIP-966 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290485321 ## docs/streams/developer-guide/config-streams.html: ## @@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization. + +rack.aware.assignment.non_overlap_cost + + + + This configuration sets the cost of moving a task from the original assignment computed either by StickyTaskAssignor or + HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost, + they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.traffic_cost, + the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting + rack.aware.assignment.non_overlap_cost to 10 and rack.aware.assignment.traffic_cost to 1 is more likely to maintain existing assignment than setting + rack.aware.assignment.non_overlap_cost to 100 and rack.aware.assignment.traffic_cost to 50. + + + The default value is null which means default non_overlap_cost in different assignors will be used. In StickyTaskAssignor, it has a higher default value than rack.aware.assignment.traffic_cost which means + maintaining stickiness is preferred in StickyTaskAssignor. In HighAvailabilityTaskAssignor, it has a lower default value than rack.aware.assignment.traffic_cost + which means minimizing cross rack traffic is preferred in HighAvailabilityTaskAssignor. + + + + + +rack.aware.assignment.strategy + + + + This configuration sets the strategy Kafka Streams can use for rack aware task assignment so that cross traffic from broker to client can be reduced. This config will only take effect when broker.rack Review Comment: ```suggestion This configuration sets the strategy Kafka Streams uses for rack aware task assignment so that cross traffic from broker to client can be reduced. This config will only take effect when broker.rack ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290489521 ## docs/streams/developer-guide/config-streams.html: ## @@ -718,6 +760,24 @@ rack.aware.assignment.tags +rack.aware.assignment.traffic_cost + + + + This configuration sets the cost of cross rack traffic. Together with rack.aware.assignment.non_overlap_cost, + they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.non_overlap_cost, + the optimizer will try to compute an assignment which minimize the cross rack traffic. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting + rack.aware.assignment.traffic_cost to 10 and rack.aware.assignment.non_overlap_cost to 1 is more likely to minimize cross rack traffic than setting + rack.aware.assignment.traffic_cost to 100 and rack.aware.assignment.non_overlap_cost to 50. + + + The default value is null which means default traffic cost in different assignors will be used. In StickyTaskAssignor, it has a lower default value than rack.aware.assignment.non_overlap_cost. + In HighAvailabilityTaskAssignor, it has a higher default value than rack.aware.assignment.non_overlap_cost. Review Comment: Include default value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290483313 ## docs/streams/developer-guide/config-streams.html: ## @@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization. + +rack.aware.assignment.non_overlap_cost + + + + This configuration sets the cost of moving a task from the original assignment computed either by StickyTaskAssignor or + HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost, + they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.traffic_cost, + the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting + rack.aware.assignment.non_overlap_cost to 10 and rack.aware.assignment.traffic_cost to 1 is more likely to maintain existing assignment than setting + rack.aware.assignment.non_overlap_cost to 100 and rack.aware.assignment.traffic_cost to 50. + + + The default value is null which means default non_overlap_cost in different assignors will be used. In StickyTaskAssignor, it has a higher default value than rack.aware.assignment.traffic_cost which means Review Comment: > it has a higher default value Should we include the value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290445751 ## docs/streams/architecture.html: ## @@ -167,6 +167,14 @@ Kafka Streams Developer Guide section. + +There is also a client side config client.rack which can set the rack for a Kafka Consumer. If broker side also have rack set via broker.rack. Then rack aware task +assignment can be enabled to compute a task assignment which can reduce cross rack traffic by try to assign tasks to clients with the same rack. See rack.aware.assignment.strategy in +Kafka Streams Developer Guide. +Note that client.rack can also be used to distribute standby tasks on different "rack" from the active ones, which has a similar functionality as rack.aware.assignment.tags. +Currently, rack.aware.assignment.tag takes precedence in distributing standby tasks which means if both configs present, rack.aware.assignment.tag will be used for distributing +standby tasks on different "rack" from the active ones because it can configure more tag keys. Review Comment: ```suggestion standby tasks on different racks from the active ones because it can configure more tag keys. ``` ## docs/streams/architecture.html: ## @@ -167,6 +167,14 @@ Kafka Streams Developer Guide section. + +There is also a client side config client.rack which can set the rack for a Kafka Consumer. If broker side also have rack set via broker.rack. Then rack aware task +assignment can be enabled to compute a task assignment which can reduce cross rack traffic by try to assign tasks to clients with the same rack. See rack.aware.assignment.strategy in +Kafka Streams Developer Guide. Review Comment: Drop this line ## docs/streams/architecture.html: ## @@ -167,6 +167,14 @@ Kafka Streams Developer Guide section. + +There is also a client side config client.rack which can set the rack for a Kafka Consumer. If broker side also have rack set via broker.rack. Then rack aware task +assignment can be enabled to compute a task assignment which can reduce cross rack traffic by try to assign tasks to clients with the same rack. See rack.aware.assignment.strategy in Review Comment: ```suggestion assignment can be enabled via rack.aware.assignment.strategy (cf. Kafka Streams Developer Guide) to compute a task assignment which can reduce cross rack traffic by trying to assign tasks to clients with the same rack. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290489338 ## docs/streams/developer-guide/config-streams.html: ## @@ -718,6 +760,24 @@ rack.aware.assignment.tags +rack.aware.assignment.traffic_cost + + + + This configuration sets the cost of cross rack traffic. Together with rack.aware.assignment.non_overlap_cost, + they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.non_overlap_cost, + the optimizer will try to compute an assignment which minimize the cross rack traffic. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting + rack.aware.assignment.traffic_cost to 10 and rack.aware.assignment.non_overlap_cost to 1 is more likely to minimize cross rack traffic than setting + rack.aware.assignment.traffic_cost to 100 and rack.aware.assignment.non_overlap_cost to 50. + + + The default value is null which means default traffic cost in different assignors will be used. In StickyTaskAssignor, it has a lower default value than rack.aware.assignment.non_overlap_cost. Review Comment: As above: include default value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290445388 ## docs/streams/architecture.html: ## @@ -167,6 +167,14 @@ Kafka Streams Developer Guide section. + +There is also a client side config client.rack which can set the rack for a Kafka Consumer. If broker side also have rack set via broker.rack. Then rack aware task +assignment can be enabled to compute a task assignment which can reduce cross rack traffic by try to assign tasks to clients with the same rack. See rack.aware.assignment.strategy in +Kafka Streams Developer Guide. +Note that client.rack can also be used to distribute standby tasks on different "rack" from the active ones, which has a similar functionality as rack.aware.assignment.tags. Review Comment: ```suggestion Note that client.rack can also be used to distribute standby tasks to different racks from the active ones, which has a similar functionality as rack.aware.assignment.tags. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor
mjsax commented on code in PR #14181: URL: https://github.com/apache/kafka/pull/14181#discussion_r1290486200 ## docs/streams/developer-guide/config-streams.html: ## @@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization. + +rack.aware.assignment.non_overlap_cost + + + + This configuration sets the cost of moving a task from the original assignment computed either by StickyTaskAssignor or + HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost, + they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.traffic_cost, + the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting + rack.aware.assignment.non_overlap_cost to 10 and rack.aware.assignment.traffic_cost to 1 is more likely to maintain existing assignment than setting + rack.aware.assignment.non_overlap_cost to 100 and rack.aware.assignment.traffic_cost to 50. + + + The default value is null which means default non_overlap_cost in different assignors will be used. In StickyTaskAssignor, it has a higher default value than rack.aware.assignment.traffic_cost which means + maintaining stickiness is preferred in StickyTaskAssignor. In HighAvailabilityTaskAssignor, it has a lower default value than rack.aware.assignment.traffic_cost + which means minimizing cross rack traffic is preferred in HighAvailabilityTaskAssignor. + + + + + +rack.aware.assignment.strategy + + + + This configuration sets the strategy Kafka Streams can use for rack aware task assignment so that cross traffic from broker to client can be reduced. This config will only take effect when broker.rack + is set on broker side and client.rack is set on Kafka Streams side. There are two settings for this config: Review Comment: > is set on broker side Either "is set on the brokers" or "is set broker side" Also for `is set on Kafka Streams side` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14159: Kafka 15291 connect plugins implement versioned
gharris1727 merged PR #14159: URL: https://github.com/apache/kafka/pull/14159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15291) Implement Versioned interfaces in common Connect plugins
[ https://issues.apache.org/jira/browse/KAFKA-15291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15291. - Resolution: Fixed > Implement Versioned interfaces in common Connect plugins > > > Key: KAFKA-15291 > URL: https://issues.apache.org/jira/browse/KAFKA-15291 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Aindriú Lavelle >Priority: Major > Fix For: 3.6.0 > > > In KAFKA-14863, we changed the plugin scanning logic to allow plugins to > opt-in to the Versioned interface individually, when previously it was > limited to Connector plugins. > To take advantage of this change, we should have all of the plugins built via > the Kafka repository opt-in, and provide the environment's Kafka version from > the AppInfoParser.getVersion(). > See the FileStreamSinkConnector as an example of the the version() method > implementation. > All subclasses of Converter, HeaderConverter, Transformation, Predicate, and > ConnectorClientConfigOverridePolicy should implement Versioned. The > interfaces themselves will _not_ extend Versioned, as that would be a > backwards-incompatible change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #14159: Kafka 15291 connect plugins implement versioned
gharris1727 commented on PR #14159: URL: https://github.com/apache/kafka/pull/14159#issuecomment-1673635408 Flaky test failures appear unrelated, and tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools
mimaison commented on code in PR #13204: URL: https://github.com/apache/kafka/pull/13204#discussion_r1290277580 ## tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java: ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.collection.JavaConverters; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@SuppressWarnings("deprecation") +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3) +@Tag("integration") +public class LeaderElectionCommandTest { +private final ClusterInstance cluster; +int broker1 = 0; +int broker2 = 1; +int broker3 = 2; + +public LeaderElectionCommandTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@BeforeEach +void setup(ClusterConfig clusterConfig) { +TestUtils.verifyNoUnexpectedThreads("@BeforeEach"); + clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false"); + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true"); + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1"); + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000"); + clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2"); +} + +@ClusterTest +public void testAllTopicPartition() throws InterruptedException, ExecutionException { +String topic = "unclean-topic"; +int partition = 0; +List assignment = Arrays.asList(broker2, broker3); + +cluster.waitForReadyBrokers(); +Admin client = cluster.createAdminClient(); +Map> partitionAssignment = new HashMap<>(); +partitionAssignment.put(partition, assignment); + +createTopic(client, topic, partitionAssignment); + +TopicPartition topicPartition = new TopicPartition(topic, partition); + +TestUtils.assertLeader(client, topicPartition, broker2); +cluster.shutdownBroker(broker3); +TestUtils.waitForBrokersOutOfIsr(client, + JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(), + JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet() +); +cluster.shutdownBroker(broker2); +TestUtils.assertNoLeader(client, topicPartition); +cluster.startBroker(broker3); +TestUtils.waitForOnlineBroker(client, broker3); + +LeaderElectionCommand.main( +new String[] { Review Comment: Since `main()` takes a varargs we don't need to create an array here. Same below ## tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java: ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache
[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
kamalcph commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1290377829 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -343,21 +345,78 @@ public void onLeadershipChange(Set partitionsBecomeLeader, /** * Deletes the internal topic partition info if delete flag is set as true. * - * @param topicPartition topic partition to be stopped. + * @param topicPartitions topic partitions that needs to be stopped. * @param delete flag to indicate whether the given topic partitions to be deleted or not. */ -public void stopPartitions(TopicPartition topicPartition, boolean delete) { +public void stopPartitions(Set topicPartitions, + boolean delete, + BiConsumer errorHandler) { +LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete); +Set topicIdPartitions = topicPartitions.stream() +.filter(topicIdByPartitionMap::containsKey) +.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp)) +.collect(Collectors.toSet()); + +topicIdPartitions.forEach(tpId -> { +try { +RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId); +if (task != null) { +LOGGER.info("Cancelling the RLM task for tpId: {}", tpId); +task.cancel(); +} +if (delete) { +LOGGER.info("Deleting the remote log segments task for partition: {}", tpId); +deleteRemoteLogPartition(tpId); +} +} catch (Exception ex) { +errorHandler.accept(tpId.topicPartition(), ex); +} +}); + if (delete) { -// Delete from internal datastructures only if it is to be deleted. -Uuid topicIdPartition = topicPartitionIds.remove(topicPartition); -LOGGER.debug("Removed partition: {} from topicPartitionIds", topicIdPartition); +// NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted +remoteLogMetadataManager.onStopPartitions(topicIdPartitions); +topicPartitions.forEach(topicIdByPartitionMap::remove); +} +} + +private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteStorageException, ExecutionException, InterruptedException { +List metadataList = new ArrayList<>(); + remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add); + +List deleteSegmentStartedEvents = metadataList.stream() +.map(metadata -> +new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), time.milliseconds(), +RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)) +.collect(Collectors.toList()); +publishEvents(deleteSegmentStartedEvents).get(); + +// KAFKA-15166: Instead of deleting the segment one by one. If the underlying RemoteStorageManager supports deleting +// a partition, then we can call that API instead. Review Comment: Done. Updated the comment to track the KAFKA-15313 ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
kamalcph commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1290376710 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -343,21 +345,78 @@ public void onLeadershipChange(Set partitionsBecomeLeader, /** * Deletes the internal topic partition info if delete flag is set as true. * - * @param topicPartition topic partition to be stopped. + * @param topicPartitions topic partitions that needs to be stopped. * @param delete flag to indicate whether the given topic partitions to be deleted or not. */ -public void stopPartitions(TopicPartition topicPartition, boolean delete) { +public void stopPartitions(Set topicPartitions, Review Comment: RemoteIndexCache does not maintains the indexes by partition instead by segment-uuid. Traversing the cache to remove those indexes will be expensive. Since, the `RemoteIndexCache` is a LRU cache, those indexes will be expired eventually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
kamalcph commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1290369199 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -343,21 +345,78 @@ public void onLeadershipChange(Set partitionsBecomeLeader, /** * Deletes the internal topic partition info if delete flag is set as true. * - * @param topicPartition topic partition to be stopped. + * @param topicPartitions topic partitions that needs to be stopped. * @param delete flag to indicate whether the given topic partitions to be deleted or not. */ -public void stopPartitions(TopicPartition topicPartition, boolean delete) { +public void stopPartitions(Set topicPartitions, + boolean delete, + BiConsumer errorHandler) { +LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete); +Set topicIdPartitions = topicPartitions.stream() +.filter(topicIdByPartitionMap::containsKey) +.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp)) +.collect(Collectors.toSet()); + +topicIdPartitions.forEach(tpId -> { +try { +RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId); +if (task != null) { +LOGGER.info("Cancelling the RLM task for tpId: {}", tpId); +task.cancel(); +} +if (delete) { +LOGGER.info("Deleting the remote log segments task for partition: {}", tpId); +deleteRemoteLogPartition(tpId); +} +} catch (Exception ex) { +errorHandler.accept(tpId.topicPartition(), ex); +} +}); + if (delete) { -// Delete from internal datastructures only if it is to be deleted. -Uuid topicIdPartition = topicPartitionIds.remove(topicPartition); -LOGGER.debug("Removed partition: {} from topicPartitionIds", topicIdPartition); +// NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted +remoteLogMetadataManager.onStopPartitions(topicIdPartitions); +topicPartitions.forEach(topicIdByPartitionMap::remove); +} +} + +private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteStorageException, ExecutionException, InterruptedException { +List metadataList = new ArrayList<>(); + remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add); + +List deleteSegmentStartedEvents = metadataList.stream() +.map(metadata -> +new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), time.milliseconds(), +RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)) +.collect(Collectors.toList()); +publishEvents(deleteSegmentStartedEvents).get(); + +// KAFKA-15166: Instead of deleting the segment one by one. If the underlying RemoteStorageManager supports deleting +// a partition, then we can call that API instead. +for (RemoteLogSegmentMetadata metadata: metadataList) { +remoteLogStorageManager.deleteLogSegmentData(metadata); Review Comment: It's already mentioned in the javadoc so not repeating the same here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
kamalcph commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1290366372 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig, } partitionsToDelete += topicPartition } + if (stopPartition.deleteRemoteLog) +remotePartitionsToDelete += topicPartition + // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. completeDelayedFetchOrProduceRequests(topicPartition) } // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] { Review Comment: Removed the `remoteStorageErrorHandler` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org