[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-10 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1290912365


##
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala:
##
@@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
 }
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)

Review Comment:
   Thank you for clarifying that. I have added the `quorum` param and verified 
that the test name is updated during runtime as you described. Here is what I 
have captured from the test run out put
   ```
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=zk PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=zk PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=zk PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=kraft PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=kraft PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=kraft PASSED
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-10 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1290912365


##
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala:
##
@@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
 }
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)

Review Comment:
   Thank you for clarifying that. I have added the `quorum` param and verified 
that the test name is actually updated during runtime as you described. Here is 
what I have captured from the test run out put
   ```
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=zk PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=zk PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=zk PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=kraft PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=kraft PASSED
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
PlaintextProducerSendTest > testSendWithInvalidBeforeAndAfterTimestamp(String, 
String, long) > testSendWithInvalidBeforeAndAfterTimestamp(String, String, 
long).quorum=kraft PASSED
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-10 Thread via GitHub


mehbey commented on PR #14135:
URL: https://github.com/apache/kafka/pull/14135#issuecomment-1674232688

   > Whenever we are making customer facing changes (adding new config in this 
scenario), we usually mention it in upgrade notes at
   > 
   > 
https://github.com/apache/kafka/blob/ddeb89f4a9f31b5ff63661e18a94c403a8f45f69/docs/upgrade.html#L24
   > 
   > Please add information about change made in this PR over there.
   
   Updated documentation to indicate that we have deprecated ` 
log.message.timestamp.difference.max.ms ` config and added two new configs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?

2023-08-10 Thread Kong Yin Lai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753049#comment-17753049
 ] 

Kong Yin Lai edited comment on KAFKA-10875 at 8/11/23 5:14 AM:
---

We are also seeing something that looks a lot like this issue. Software we are 
using:
 * Kafka version 2.6.0
 * librdkafka 1.9.2

It is quite rare and only seems to affect "compact,delete" topics where a lot 
of compaction is happening i.e. data is continually being published that 
"replaces" almost all of existing data with the same key. And in all cases seen 
so far, a restart of the client would successfully retrieve the offset for the 
time that it requests (which is 12am of the current day in local time).


was (Author: kong):
We are also seeing something that looks a lot like this issue. Software we are 
using:
 * Kafka version 2.6.0
 * librdkakfka 1.9.2

It is quite rare and only seems to affect "compact,delete" topics where a lot 
of compaction is happening i.e. data is continually being published that 
"replaces" almost all of existing data with the same key. And in all cases seen 
so far, a restart of the client would successfully retrieve the offset for the 
time that it requests (which is 12am of the current day in local time).

> offsetsForTimes returns null for some partitions when it shouldn't?
> ---
>
> Key: KAFKA-10875
> URL: https://issues.apache.org/jira/browse/KAFKA-10875
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yifei Gong
>Priority: Minor
>
> I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
> I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am 
> trying to seek to offsets after certain timestamp inside 
> {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
> I found this strange behavior of method {{offsetsForTimes}}:
> When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 
> 5:06:55.534 AM) like below:
> {code:java}
> @Override
> public void onPartitionsAssigned(Consumer consumer, 
> Collection partitions) {
> // calling assignment just to ensure my consumer is actually assigned the 
> partitions
> Set tps = consumer.assignment();
> Map offsetsForTimes = new HashMap<>();
> offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
> .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L;
> }
> {code}
> By setting breakpoint, I can see I got below map:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
> {TopicPartition@5498} "My.Data.Topic-5" -> null
> {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} 
> "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
> {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} 
> "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" 
> {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} 
> "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" 
> {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} 
> "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" 
> {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} 
> "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" 
> {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} 
> "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" 
> {TopicPartition@5518} "My.Data.Topic-6" -> null
> {noformat}
> As you can see some of the partitions (5 and 6) it returned null.
> However, if I seek a more recent timestamp like {{1607941818423L}} (GMT 
> December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
> {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} 
> "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
> {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} 
> "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
> {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} 
> "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
> {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} 
> "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
> {TopicPartition@5510} "My.Data.Topic-9" -> 

[jira] [Commented] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?

2023-08-10 Thread Kong Yin Lai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753049#comment-17753049
 ] 

Kong Yin Lai commented on KAFKA-10875:
--

We are also seeing something that looks a lot like this issue. Software we are 
using:
 * Kafka version 2.6.0
 * librdkakfka 1.9.2

It is quite rare and only seems to affect "compact,delete" topics where a lot 
of compaction is happening i.e. data is continually being published that 
"replaces" almost all of existing data with the same key. And in all cases seen 
so far, a restart of the client would successfully retrieve the offset for the 
time that it requests (which is 12am of the current day in local time).

> offsetsForTimes returns null for some partitions when it shouldn't?
> ---
>
> Key: KAFKA-10875
> URL: https://issues.apache.org/jira/browse/KAFKA-10875
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yifei Gong
>Priority: Minor
>
> I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
> I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am 
> trying to seek to offsets after certain timestamp inside 
> {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
> I found this strange behavior of method {{offsetsForTimes}}:
> When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 
> 5:06:55.534 AM) like below:
> {code:java}
> @Override
> public void onPartitionsAssigned(Consumer consumer, 
> Collection partitions) {
> // calling assignment just to ensure my consumer is actually assigned the 
> partitions
> Set tps = consumer.assignment();
> Map offsetsForTimes = new HashMap<>();
> offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
> .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L;
> }
> {code}
> By setting breakpoint, I can see I got below map:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
> {TopicPartition@5498} "My.Data.Topic-5" -> null
> {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} 
> "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
> {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} 
> "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" 
> {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} 
> "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" 
> {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} 
> "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" 
> {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} 
> "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" 
> {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} 
> "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" 
> {TopicPartition@5518} "My.Data.Topic-6" -> null
> {noformat}
> As you can see some of the partitions (5 and 6) it returned null.
> However, if I seek a more recent timestamp like {{1607941818423L}} (GMT 
> December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
> {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} 
> "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
> {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} 
> "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
> {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} 
> "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
> {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} 
> "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
> {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} 
> "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
> {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} 
> "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
> {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} 
> "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
> {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} 
> "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"
> {noformat}
> So I am confused why seeking to an older timestamp gave me nulls when there 
> are indeed 

[GitHub] [kafka] bachmanity1 commented on pull request #14190: KAFKA-7438: Replace Easymock & Powermock with Mockito in RocksDBMetricsRecorderGa…

2023-08-10 Thread via GitHub


bachmanity1 commented on PR #14190:
URL: https://github.com/apache/kafka/pull/14190#issuecomment-1674220529

   @cadonna @divijvaidya can you please review this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 opened a new pull request, #14190: KAFKA-7438: Replace Easymock & Powermock with Mockito in RocksDBMetricsRecorderGa…

2023-08-10 Thread via GitHub


bachmanity1 opened a new pull request, #14190:
URL: https://github.com/apache/kafka/pull/14190

   Replace Easymock & Powermock with Mockito in 
RocksDBMetricsRecorderGaugesTest.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 commented on pull request #14189: KAFKA-7438: Replace Easymock & Powermock with Mockito in TableSourceNodeTest

2023-08-10 Thread via GitHub


bachmanity1 commented on PR #14189:
URL: https://github.com/apache/kafka/pull/14189#issuecomment-1674210277

   @cadonna @mjsax @C0urante can you please review this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 opened a new pull request, #14189: KAFKA-7438: Replace Easymock & Powermock with Mockito in TableSourceNodeTest

2023-08-10 Thread via GitHub


bachmanity1 opened a new pull request, #14189:
URL: https://github.com/apache/kafka/pull/14189

   Replace Easymock & Powermock with Mockito in TableSourceNodeTest
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable

2023-08-10 Thread via GitHub


C0urante commented on PR #14177:
URL: https://github.com/apache/kafka/pull/14177#issuecomment-1674205775

   Changes seems fine, but CI results are a little worrying. Nothing directly 
tied to this PR on the surface, but I've kicked off another build just to be 
safe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build

2023-08-10 Thread via GitHub


C0urante commented on PR #14186:
URL: https://github.com/apache/kafka/pull/14186#issuecomment-1674202980

   @divijvaidya given that this touches on the strict stubbing feature that 
you've helped introduce to our tests, would you be interested in reviewing?
   
   @mumrah @ijuma based on discussion on 
[KAFKA-4594](https://issues.apache.org/jira/browse/KAFKA-4594) and 
https://github.com/apache/kafka/pull/2695 it seems like the original intent 
behind the two-task approach (i.e., running `unitTest integrationTest` instead 
of just `test`) was to get faster feedback from our CI builds, but that this 
was a minor concern compared to the goal of the ticket to just be able to run 
unit tests in isolation locally. Given that it's a bit like finding a needle in 
a haystack now to sift through in-progress CI results and this PR doesn't 
compromise the original goal laid out in Jira, this change seems safe enough to 
make. However, in case there's context I'm missing or you feel differently, I 
wanted to give you a chance to weigh in on this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290880628


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;

Review Comment:
   Changed a few of them, rest we discussed on call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290880471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;
+// Count of members expected to receive an extra partition beyond the 
minimum quota,
+// to account for the distribution of the remaining partitions.
+private int expectedNumMembersWithExtraPartition;
+// Map of members to their remaining partitions needed to meet the minimum 
quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the number of partitions they still need to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+// Partitions that still need to be assigned.
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the previous owner of each partition when using rack-aware 
strategy.
+private final Map partitionToPrevOwner;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignor(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.assignmentSpec = assignmentSpec;
+
+subscriptionList = new 
ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+
+int totalPartitionsCount = 0;
+// Removes the current topic from 

[GitHub] [kafka] C0urante commented on pull request #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build

2023-08-10 Thread via GitHub


C0urante commented on PR #14186:
URL: https://github.com/apache/kafka/pull/14186#issuecomment-1674193833

   As we can see with the [CI results for the first 
build](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14186/1/tests),
 unused stubbings were successfully reported. I've reverted the 
intentionally-failing commit now, and this should be ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-08-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15083:
--
Description: 
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.

 

When configuring RLMM, the configs passed into {{configure}} method is the 
{{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's 
no configs related to {{{}remote.log.metadata.*{}}}, ex: 
{{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: 
{{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), 
and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass 
into RLMM, including 
{{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/
 {{remote.log.metadata.consumer.}} prefixes.

Ex:
{code:java}
# default value 
# remote.log.storage.manager.impl.prefix=rsm.config. 
# remote.log.metadata.manager.impl.prefix=rlmm.config. 

# pass to RLMM
rlmm.config.remote.log.metadata.topic.num.partitions=50 
rlmm.config.remote.log.metadata.topic.replication.factor=4 

# pass to RSM
rsm.config.test=value {code}

  was:
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.

 

When configuring RLMM, the configs passed into {{configure}} method is the 
{{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's 
no configs related to {{{}remote.log.metadata.*{}}}, ex: 
{{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: 
{{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), 
and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass 
into RLMM, including 
{{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/
 {{remote.log.metadata.consumer.}} prefixes.

Ex:
# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.

rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4

rsm.config.test=value


> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties 

[GitHub] [kafka] showuon merged pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-10 Thread via GitHub


showuon merged PR #14151:
URL: https://github.com/apache/kafka/pull/14151


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-08-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15083:
--
Description: 
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.

 

When configuring RLMM, the configs passed into {{configure}} method is the 
{{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, there's 
no configs related to {{{}remote.log.metadata.*{}}}, ex: 
{{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: 
{{remote.log.metadata.manager.impl.prefix}} (default is {{{}rlmm.config.{}}}), 
and then, appending the desired {{remote.log.metadata.*}} configs, it'll pass 
into RLMM, including 
{{{}remote.log.metadata.common.client.{}}}/{{{}remote.log.metadata.producer.{}}}/
 {{remote.log.metadata.consumer.}} prefixes.

Ex:
# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.

rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4

rsm.config.test=value

  was:
Based on the 
[KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
|_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
implementation creates producer and consumer instances. Common client 
propoerties can be configured with `remote.log.metadata.common.client.` prefix. 
 User can also pass properties specific to 
{color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
and `remote.log.metadata.consumer.` prefixes. These will override properties 
with `remote.log.metadata.common.client.` prefix.{color}
{color:#00}Any other properties should be prefixed with 
"remote.log.metadata." and these will be passed to 
RemoteLogMetadataManager#configure(Map props).{color}
{color:#00}For ex: Security configuration to connect to the local broker 
for the listener name configured are passed with props.{color}|

 

This is missed from current implementation.


> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.
>  
> When configuring RLMM, the configs passed into {{configure}} method is the 
> {{{}RemoteLogManagerConfig{}}}. But in {{{}RemoteLogManagerConfig{}}}, 
> there's no configs related to {{{}remote.log.metadata.*{}}}, ex: 
> {{{}remote.log.metadata.topic.replication.factor{}}}. So, even if users have 
> set the config in broker, it'll never be applied.
> This PR fixed the issue to allow users setting RLMM prefix: 
> 

[GitHub] [kafka] showuon commented on pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-10 Thread via GitHub


showuon commented on PR #14151:
URL: https://github.com/apache/kafka/pull/14151#issuecomment-1674147451

   Failed tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
kafka.log.remote.RemoteIndexCacheTest.testClose()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testConfigResourceExistenceChecker()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-08-10 Thread via GitHub


showuon merged PR #14121:
URL: https://github.com/apache/kafka/pull/14121


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-08-10 Thread via GitHub


showuon commented on PR #14121:
URL: https://github.com/apache/kafka/pull/14121#issuecomment-1674146055

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest

2023-08-10 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming updated KAFKA-15289:

Description: we are calling `zkBrokerApis` in , we should ensure kraft 
broker apis are also supported, so use clientApis as far as possible.use 
zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis in most cases  (was: In 
most Test cases, we are calling `zkBrokerApis`, we should ensure kraft broker 
apis are also supported, so use clientApis as far as possible.)

> Support KRaft mode in RequestQuotaTest
> --
>
> Key: KAFKA-15289
> URL: https://issues.apache.org/jira/browse/KAFKA-15289
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Major
>  Labels: newbee
>
> we are calling `zkBrokerApis` in , we should ensure kraft broker apis are 
> also supported, so use clientApis as far as possible.use 
> zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis in most cases



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest

2023-08-10 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming updated KAFKA-15289:

Description: we are calling `zkBrokerApis` in RequestQuotaTest, we should 
ensure kraft broker apis are also supported, so use clientApis as far as 
possible.use zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis.  (was: we 
are calling `zkBrokerApis` in , we should ensure kraft broker apis are also 
supported, so use clientApis as far as possible.use zkBrokerApis.clientApis 
instead of ApiKeys.zkBrokerApis in most cases)

> Support KRaft mode in RequestQuotaTest
> --
>
> Key: KAFKA-15289
> URL: https://issues.apache.org/jira/browse/KAFKA-15289
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Major
>  Labels: newbee
>
> we are calling `zkBrokerApis` in RequestQuotaTest, we should ensure kraft 
> broker apis are also supported, so use clientApis as far as possible.use 
> zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest

2023-08-10 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming updated KAFKA-15289:

Summary: Support KRaft mode in RequestQuotaTest  (was: Use 
zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis in most cases)

> Support KRaft mode in RequestQuotaTest
> --
>
> Key: KAFKA-15289
> URL: https://issues.apache.org/jira/browse/KAFKA-15289
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Major
>  Labels: newbee
>
> In most Test cases, we are calling `zkBrokerApis`, we should ensure kraft 
> broker apis are also supported, so use clientApis as far as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15287) Change NodeApiVersions.create() to contains both apis of zk and kraft broker

2023-08-10 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming resolved KAFKA-15287.
-
Resolution: Fixed

> Change NodeApiVersions.create() to contains both apis of zk and kraft broker 
> -
>
> Key: KAFKA-15287
> URL: https://issues.apache.org/jira/browse/KAFKA-15287
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Major
>  Labels: newbee
>
> We are using ApiKeys.zkBrokerApis() when calling NodeApiVersions.create(), 
> this means we only support zk broker apis.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming merged pull request #14185: KAFKA-15287: Change NodeApiVersions.create() to support kraft

2023-08-10 Thread via GitHub


dengziming merged PR #14185:
URL: https://github.com/apache/kafka/pull/14185


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2023-08-10 Thread Nelson Bighetti (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Bighetti updated KAFKA-14132:

Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [[~bachmanity1] ])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ConnectorsResourceTest{color} (owner: 

[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2023-08-10 Thread Nelson Bighetti (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Bighetti updated KAFKA-14132:

Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: @bachmanity ])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [[~bachmanity1] ])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
>  

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


jeffkbkim commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290813753


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;
+// Count of members expected to receive an extra partition beyond the 
minimum quota,
+// to account for the distribution of the remaining partitions.
+private int expectedNumMembersWithExtraPartition;
+// Map of members to their remaining partitions needed to meet the minimum 
quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the number of partitions they still need to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+// Partitions that still need to be assigned.
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the previous owner of each partition when using rack-aware 
strategy.
+private final Map partitionToPrevOwner;

Review Comment:
   it is implied by the type that the previous owners are keyed by 
TopicIdPartition. like how we are using `potentiallyUnfilledMembers`, 
`unfilledMembers`, or `newAssignment`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


jeffkbkim commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290813322


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;
+// Count of members expected to receive an extra partition beyond the 
minimum quota,
+// to account for the distribution of the remaining partitions.
+private int expectedNumMembersWithExtraPartition;

Review Comment:
   `numMembersWithExtraPartition` : number of members to get the extra 
partition, no?
   
   there is no expected vs. actual though, we always distribute that number



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14188: KAFKA-15239: Fix ThroughputThrottler import-control

2023-08-10 Thread via GitHub


gharris1727 merged PR #14188:
URL: https://github.com/apache/kafka/pull/14188


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #14188: KAFKA-15239: Fix ThroughputThrottler import-control

2023-08-10 Thread via GitHub


gharris1727 opened a new pull request, #14188:
URL: https://github.com/apache/kafka/pull/14188

   #14092 broke checkstyle due to the package name changing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-14598) Fix flaky ConnectRestApiTest

2023-08-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reopened KAFKA-14598:
-
  Assignee: (was: Ashwin Pankaj)

> Fix flaky ConnectRestApiTest
> 
>
> Key: KAFKA-14598
> URL: https://issues.apache.org/jira/browse/KAFKA-14598
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ashwin Pankaj
>Priority: Minor
>  Labels: flaky-test
>
> ConnectRestApiTest sometimes fails with the message
> {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not 
> Found\n\nHTTP ERROR 404 Not 
> Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not
>  
> Found\nSERVLET:-\n\n\n\n\n',
>  'http://172.31.1.75:8083/connector-plugins/')}}
> This happens because ConnectDistributedService.start() by default waits till 
> the the line
> {{Joined group at generation ..}} is visible in the logs.
> In most cases this is sufficient. But in the cases where the test fails, we 
> see that this message appears even before Connect RestServer has finished 
> initialization.
>  {quote}   - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, 
> groupId=connect-cluster] Joined group at generation 2 with protocol version 1 
> and got assignment: Assignment{error=0, 
> leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', 
> leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 
> +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" 
> "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer)
> - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is 
> started and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290766391


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;
+// Count of members expected to receive an extra partition beyond the 
minimum quota,
+// to account for the distribution of the remaining partitions.
+private int expectedNumMembersWithExtraPartition;
+// Map of members to their remaining partitions needed to meet the minimum 
quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the number of partitions they still need to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+// Partitions that still need to be assigned.
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the previous owner of each partition when using rack-aware 
strategy.
+private final Map partitionToPrevOwner;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignor(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.assignmentSpec = assignmentSpec;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290765488


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;
+// Count of members expected to receive an extra partition beyond the 
minimum quota,
+// to account for the distribution of the remaining partitions.
+private int expectedNumMembersWithExtraPartition;
+// Map of members to their remaining partitions needed to meet the minimum 
quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the number of partitions they still need to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+// Partitions that still need to be assigned.
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the previous owner of each partition when using rack-aware 
strategy.
+private final Map partitionToPrevOwner;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignor(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.assignmentSpec = assignmentSpec;
+
+subscriptionList = new 
ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+
+int totalPartitionsCount = 0;
+// Removes the current topic from 

[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-10 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1290474356


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -140,17 +167,30 @@ public long metadataExpireMs() {
 }
 
 /**
- * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
+ * Request an update of the current cluster metadata info, permitting 
backoff based on the number of
+ * equivalent responses, which indicate that metadata responses did not 
make progress and may be stale.

Review Comment:
   Could we add a comment on when the caller should set 
permitBackoffOnEquivalentResponses to true? Also, when should 
backoffOnEquivalentResponses be reset to false?



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -114,18 +127,32 @@ public synchronized Cluster fetch() {
 
 /**
  * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+ * since the last successful response, and how many equivalent metadata 
responses have been received.
+ * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+ * metadata responses are clean.
+ * 
+ * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+ * not be delayed if this method returns 0.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+// Calculate the backoff for attempts which acts when metadata 
responses fail
+long backoffForAttempts = Math.max(this.lastRefreshMs + 
this.refreshBackoff.backoff(this.attempts) - nowMs, 0);
+
+// Calculate the backoff for equivalent responses which acts when 
metadata responses as not making progress

Review Comment:
   as not making => are not making ?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -598,22 +608,22 @@ private void insertInSequenceOrder(Deque 
deque, ProducerBatch bat
 /**
  * Add the leader to the ready nodes if the batch is ready
  *
- * @param nowMs The current time
  * @param exhausted 'true' is the buffer pool is exhausted
  * @param part The partition
  * @param leader The leader for the partition
  * @param waitedTimeMs How long batch waited
  * @param backingOff Is backing off
+ * @param backoffAttempts Number of attempts for calculating backoff delay
  * @param full Is batch full
  * @param nextReadyCheckDelayMs The delay for next check
  * @param readyNodes The set of ready nodes (to be filled in)
  * @return The delay for next check
  */
-private long batchReady(long nowMs, boolean exhausted, TopicPartition 
part, Node leader,
-long waitedTimeMs, boolean backingOff, boolean 
full,
-long nextReadyCheckDelayMs, Set readyNodes) {
+private long batchReady(boolean exhausted, TopicPartition part, Node 
leader,
+long waitedTimeMs, boolean backingOff, int 
backoffAttempts,
+boolean full, long nextReadyCheckDelayMs, 
Set readyNodes) {
 if (!readyNodes.contains(leader) && !isMuted(part)) {
-long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+long timeToWaitMs = backingOff ? 
retryBackoff.backoff(backoffAttempts) : lingerMs;

Review Comment:
   Here, backoffAttempts is already subtracted by one when calling 
retryBackoff.backoff. In other places, we pass in the true attempts and 
subtract by one when calling retryBackoff.backoff. It would be useful to make 
that consistent.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -550,7 +550,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
 // refresh metadata before re-joining the group as long as 
the refresh backoff time has
 // passed.
 if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) 
== 0) {
-this.metadata.requestUpdate();
+this.metadata.requestUpdate(true);

Review Comment:
   Why is permitBackoffOnEquivalentResponses set to true? We refresh the 
metadata here not because we have discovered stale metadata.



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -140,17 +167,30 @@ public long metadataExpireMs() {
 }
 
 /**
- * Request an update 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290764909


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;
+// Count of members expected to receive an extra partition beyond the 
minimum quota,
+// to account for the distribution of the remaining partitions.
+private int expectedNumMembersWithExtraPartition;
+// Map of members to their remaining partitions needed to meet the minimum 
quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the number of partitions they still need to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+// Partitions that still need to be assigned.
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the previous owner of each partition when using rack-aware 
strategy.
+private final Map partitionToPrevOwner;

Review Comment:
   I think this name makes it clear that its a map, if we just used 
previousOwners, its not clear whose previous owner right. We would then have to 
name it previousPartitionOwners, which is kinda the same as 
partitionToPreviousOwner? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290763624


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+private final RackInfo rackInfo;
+// The minimum required quota that each member needs to meet for a 
balanced assignment.
+// This is the same for all members.
+private final int minQuota;
+// Count of members expected to receive an extra partition beyond the 
minimum quota,
+// to account for the distribution of the remaining partitions.
+private int expectedNumMembersWithExtraPartition;

Review Comment:
   yeah because the extra partitions haven't been assigned yet, I'm just taking 
a count of how many are expected to get the extra partition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290763071


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *

Review Comment:
   including balance >rack>stickiness



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known

2023-08-10 Thread via GitHub


philipnee commented on PR #14179:
URL: https://github.com/apache/kafka/pull/14179#issuecomment-1674034799

   @junrao - This minor PR fixes a small bug in the commit request manager. 
Would you be able to review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known

2023-08-10 Thread via GitHub


philipnee commented on PR #14179:
URL: https://github.com/apache/kafka/pull/14179#issuecomment-1674034179

   Here are the failing flaky tests.
   ```
   Build / JDK 17 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   1m 48s
   Build / JDK 17 and Scala 2.13 / 
testListenerConnectionRateLimitWhenActualRateAboveLimit() – 
kafka.network.ConnectionQuotasTest
   16s
   Build / JDK 17 and Scala 2.13 / 
shouldThrowStreamsExceptionOnStartupIfExceptionOccurred – 
org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest
   <1s
   Build / JDK 8 and Scala 2.12 / testReplication() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
   1m 45s
   Build / JDK 8 and Scala 2.12 / 
testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
   1m 17s
   Build / JDK 8 and Scala 2.12 / 
testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
   1m 16s
   Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   13s
   Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   14s
   Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   1m 55s
   Build / JDK 20 and Scala 2.13 / testSimultaneousUpwardAndDownwardDelegating 
– org.apache.kafka.connect.runtime.isolation.SynchronizationTest
   13s
   Build / JDK 20 and Scala 2.13 / testRackAwareRangeAssignor() – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   40s
   Build / JDK 20 and Scala 2.13 / 
testListenerConnectionRateLimitWhenActualRateAboveLimit() – 
kafka.network.ConnectionQuotasTest
   19s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14187: KAFKA-13197: fix GlobalKTable join/left-join semantics documentation.

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14187:
URL: https://github.com/apache/kafka/pull/14187#discussion_r1290758646


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -2805,7 +2805,7 @@  KStream leftJoin(final KTable 
table,
  * For each {@code KStream} record that finds a corresponding record in 
{@link GlobalKTable} the provided
  * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
  * The key of the result record is the same as the key of this {@code 
KStream}.
- * If a {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
+ * If a {@code KStream} input value is {@code null} the record will not be 
included in the join
  * operation and thus no output record will be added to the resulting 
{@code KStream}.
  * If {@code keyValueMapper} returns {@code null} implying no match 
exists, no output record will be added to the
  * resulting {@code KStream}.

Review Comment:
   Should we merge both sentences:
   ```
   If a {@code KStream} input record key or value is {@code null} the record 
will not be included in the join
* If a {@code KStream} input value is {@code null} the record will not 
be included in the join
* operation and thus no output record will be added to the resulting 
{@code KStream}.
* If {@code keyValueMapper} returns {@code null} implying no match 
exists, no output record will be added to the
* resulting {@code KStream}.
   ```
   
   Into:
   ```
   * If a {@code KStream} input value is {@code null} or if {@code 
keyValueMapper} returns {@code null} the record
   * will not be included in the join operation and thus no output record will 
be added to the resulting {@code KStream}.
   ```
   
   Same below? (Note: for left-join the JavaDocs about `null` returned from 
`keyValueMapper` seem to be wrong, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2023-08-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13197:

Component/s: documentation
 streams

> KStream-GlobalKTable join semantics don't match documentation
> -
>
> Key: KAFKA-13197
> URL: https://issues.apache.org/jira/browse/KAFKA-13197
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 2.7.0
>Reporter: Tommy Becker
>Assignee: Florin Akermann
>Priority: Major
>
> As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was 
> changed. It appears the change was intended to merely relax a requirement but 
> it actually broke backwards compatibility. Although it does allow {{null}} 
> keys and values in the KStream to be joined, it now excludes {{null}} results 
> of the {{KeyValueMapper}}. We have an application which can return {{null}} 
> from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on 
> these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still 
> explicitly says this is done:
> {quote}If a KStream input record key or value is null the record will not be 
> included in the join operation and thus no output record will be added to the 
> resulting KStream.
>  If keyValueMapper returns null implying no match exists, a null value will 
> be provided to ValueJoiner.
> {quote}
> Both these statements are incorrect.
> I think the new behavior is worse than the previous/documented behavior. It 
> feels more reasonable to have a non-null stream record map to a null join key 
> (our use-case is event-enhancement where the incoming record doesn't have the 
> join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-10 Thread via GitHub


gharris1727 commented on PR #14092:
URL: https://github.com/apache/kafka/pull/14092#issuecomment-1673942664

   Thank you @fvaleri for diagnosing this test regression and promptly fixing 
it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0

2023-08-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15239.
-
Fix Version/s: 3.6.0
 Reviewer: Greg Harris
   Resolution: Fixed

> producerPerformance system test for old client failed after v3.5.0
> --
>
> Key: KAFKA-15239
> URL: https://issues.apache.org/jira/browse/KAFKA-15239
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 3.6.0
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.6.0
>
>
> While running producer performance tool in system test for old client (ex: 
> quota_test), we will try to run with the dev-branch's jar file, to make sure 
> it is backward compatible, as described 
> [here|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/producer_performance.py#L86-L88],.
> {code:java}
> # In order to ensure more consistent configuration between versions, always 
> use the ProducerPerformance tool from the development branch {code}
>  
> But in KAFKA-14525, we're moving tools from core module to a separate tool 
> module, we're actually breaking the backward compatibility. We should fix the 
> system test. Also maybe we should also mention anywhere about this backward 
> compatibility issue?
> Note:
> This is the command run in system test. Suppose it's testing old client 3.4.0 
> (file put under `~/Downloads/kafka_2.13-3.4.0` in my env), and running under 
> the latest trunk env.
> {code:java}
> > for file in ./tools/build/libs/kafka-tools*.jar; do 
> > CLASSPATH=$CLASSPATH:$file; done; for file in 
> > ./tools/build/dependant-libs*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; 
> > export CLASSPATH;  export 
> > KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:config/tools-log4j.properties";
> >  KAFKA_OPTS= KAFKA_HEAP_OPTS="-XX:+HeapDumpOnOutOfMemoryError" 
> > ~/Downloads/kafka_2.13-3.4.0/bin/kafka-run-class.sh 
> > org.apache.kafka.tools.ProducerPerformance --topic test_topic --num-records 
> > 5 --record-size 3000 --throughput -1 --producer-props 
> > bootstrap.servers=localhost:9092 client.id=overridden_id 
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/utils/ThroughputThrottler
>     at 
> org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101)
>     at 
> org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.utils.ThroughputThrottler
>     at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
>     ... 2 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 merged pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-10 Thread via GitHub


gharris1727 merged PR #14092:
URL: https://github.com/apache/kafka/pull/14092


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15202) MM2 OffsetSyncStore clears too many syncs when sync spacing is variable

2023-08-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15202:

Affects Version/s: 3.5.1
   (was: 3.3.3)

> MM2 OffsetSyncStore clears too many syncs when sync spacing is variable
> ---
>
> Key: KAFKA-15202
> URL: https://issues.apache.org/jira/browse/KAFKA-15202
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.0, 3.4.1, 3.5.1
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.3.3, 3.4.2, 3.5.2
>
>
> The spacing between OffsetSyncs can vary significantly, due to conditions in 
> the upstream topic and in the replication rate of the MirrorSourceTask.
> The OffsetSyncStore attempts to keep a maximal number of distinct syncs 
> present, and for regularly spaced syncs it does not allow an incoming sync to 
> expire more than one other unique sync. There are tests to enforce this 
> property.
> For variable spaced syncs, there is no such guarantee, because multiple 
> fine-grained syncs may need to be expired at the same time. However, instead 
> of only those fine-grained syncs being expired, the store may also expire 
> coarser-grained syncs. This causes a large decrease in the number of unique 
> syncs.
> This is an extremely simple example: Syncs: 0 (start), 1, 2, 4.
> The result:
> {noformat}
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=1, 
> downstreamOffset=1} applied, new state is [1:1,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194)
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=2, 
> downstreamOffset=2} applied, new state is [2:2,1:1,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194)
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=4, 
> downstreamOffset=4} applied, new state is [4:4,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194){noformat}
> Instead of being expired, the 2:2 sync should still be present in the final 
> state, allowing the store to maintain 3 unique syncs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15202) MM2 OffsetSyncStore clears too many syncs when sync spacing is variable

2023-08-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15202:

Fix Version/s: 3.3.3
   3.4.2
   3.5.2

> MM2 OffsetSyncStore clears too many syncs when sync spacing is variable
> ---
>
> Key: KAFKA-15202
> URL: https://issues.apache.org/jira/browse/KAFKA-15202
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.3.3, 3.4.2, 3.5.2
>
>
> The spacing between OffsetSyncs can vary significantly, due to conditions in 
> the upstream topic and in the replication rate of the MirrorSourceTask.
> The OffsetSyncStore attempts to keep a maximal number of distinct syncs 
> present, and for regularly spaced syncs it does not allow an incoming sync to 
> expire more than one other unique sync. There are tests to enforce this 
> property.
> For variable spaced syncs, there is no such guarantee, because multiple 
> fine-grained syncs may need to be expired at the same time. However, instead 
> of only those fine-grained syncs being expired, the store may also expire 
> coarser-grained syncs. This causes a large decrease in the number of unique 
> syncs.
> This is an extremely simple example: Syncs: 0 (start), 1, 2, 4.
> The result:
> {noformat}
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=1, 
> downstreamOffset=1} applied, new state is [1:1,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194)
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=2, 
> downstreamOffset=2} applied, new state is [2:2,1:1,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194)
> TRACE New sync OffsetSync{topicPartition=topic1-2, upstreamOffset=4, 
> downstreamOffset=4} applied, new state is [4:4,0:0] 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:194){noformat}
> Instead of being expired, the 2:2 sync should still be present in the final 
> state, allowing the store to maintain 3 unique syncs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13187) Replace EasyMock and PowerMock with Mockito for DistributedHerderTest

2023-08-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13187.
---
Fix Version/s: 3.6.0
   Resolution: Done

> Replace EasyMock and PowerMock with Mockito for DistributedHerderTest
> -
>
> Key: KAFKA-13187
> URL: https://issues.apache.org/jira/browse/KAFKA-13187
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-08-10 Thread via GitHub


C0urante merged PR #14102:
URL: https://github.com/apache/kafka/pull/14102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-08-10 Thread via GitHub


C0urante commented on PR #14102:
URL: https://github.com/apache/kafka/pull/14102#issuecomment-1673896029

   I should note that because this test makes heavy use of Mockito's strict 
stubbing feature, I've opened https://github.com/apache/kafka/pull/14186 to 
enable reporting of unused stubbings in our CI builds. I've verified locally 
that there are no unused stubbings in this PR (`./gradlew :connect:runtime:test 
--tests DistributedHerderTest` does the trick), and any local build that uses 
the `test` task instead of `unitTest` should also report unused stubbings, so I 
don't believe it's necessary to block on 
https://github.com/apache/kafka/pull/14186 before merging this.
   
   Test failures in CI also appear unrelated. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14102:
URL: https://github.com/apache/kafka/pull/14102#discussion_r1290658873


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3872,7 +2952,7 @@ public void testVerifyTaskGeneration() {
 assertThrows(ConnectException.class, () -> 
herder.verifyTaskGenerationAndOwnership(unassignedTask, 1, verifyCallback));
 assertThrows(ConnectException.class, () -> 
herder.verifyTaskGenerationAndOwnership(unassignedTask, 2, verifyCallback));
 
-PowerMock.verifyAll();
+verify(verifyCallback, times(3)).onCompletion(isNull(), isNull());

Review Comment:
   Whoops!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14102:
URL: https://github.com/apache/kafka/pull/14102#discussion_r1290659476


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3929,543 +3008,636 @@ public void 
testPollDurationOnSlowConnectorOperations() {
 final int rebalanceDelayMs = 2;
 final long operationDelayMs = 1;
 final long maxPollWaitMs = rebalanceDelayMs - operationDelayMs;
-EasyMock.expect(member.memberId()).andStubReturn("member");
-
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+when(member.memberId()).thenReturn("member");
+
when(member.currentProtocolVersion()).thenReturn(connectProtocolVersion);
 
 // Assign the connector to this worker, and have it start
 expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
 expectConfigRefreshAndSnapshot(SNAPSHOT);
-Capture> onFirstStart = newCapture();
-worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), 
EasyMock.anyObject(),
-EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), 
capture(onFirstStart));
-PowerMock.expectLastCall().andAnswer(() -> {
+ArgumentCaptor> onFirstStart = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
 time.sleep(operationDelayMs);
 onFirstStart.getValue().onCompletion(null, TargetState.STARTED);
 return true;
-});
-member.wakeup();
-PowerMock.expectLastCall();
-expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> 
TASK_CONFIGS);
-// We should poll for less than the delay - time to start the 
connector, meaning that a long connector start
-// does not delay the poll timeout
-member.poll(leq(maxPollWaitMs));
-PowerMock.expectLastCall();
+}).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG), any(), 
eq(herder), eq(TargetState.STARTED), onFirstStart.capture());
+expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> 
TASK_CONFIGS);
+
+herder.tick();
 
 // Rebalance again due to config update
-member.wakeup();
-PowerMock.expectLastCall();
 expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
-
EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
-
-worker.stopAndAwaitConnector(CONN1);
-PowerMock.expectLastCall();
-
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
-Capture> onSecondStart = newCapture();
-worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), 
EasyMock.anyObject(),
-EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), 
capture(onSecondStart));
-PowerMock.expectLastCall().andAnswer(() -> {
+
when(configBackingStore.snapshot()).thenReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+doNothing().when(worker).stopAndAwaitConnector(CONN1);
+
+ArgumentCaptor> onSecondStart = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
 time.sleep(operationDelayMs);
 onSecondStart.getValue().onCompletion(null, TargetState.STARTED);
 return true;
-});
-member.wakeup();
-PowerMock.expectLastCall();
-member.poll(leq(maxPollWaitMs));
-PowerMock.expectLastCall();
+}).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG_UPDATED), 
any(), eq(herder), eq(TargetState.STARTED), onSecondStart.capture());
+expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, 
invocation -> TASK_CONFIGS);
+
+configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated 
config
+herder.tick();
 
 // Third tick should resolve all outstanding requests
 expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
 // which includes querying the connector task configs after the update
-expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, () -> {
+expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, 
invocation -> {
 time.sleep(operationDelayMs);
 return TASK_CONFIGS;
 });
-member.poll(leq(maxPollWaitMs));
-PowerMock.expectLastCall();
-
-PowerMock.replayAll();
-herder.tick();
-configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated 
config
-herder.tick();
 herder.tick();
-   

[GitHub] [kafka] rittikaadhikari commented on pull request #13503: MINOR: Refactor TierStateMachine related tests into a separate test file

2023-08-10 Thread via GitHub


rittikaadhikari commented on PR #13503:
URL: https://github.com/apache/kafka/pull/13503#issuecomment-1673867152

   @junrao Thanks for the review! I have addressed the comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-10 Thread via GitHub


gharris1727 merged PR #14156:
URL: https://github.com/apache/kafka/pull/14156


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-10 Thread via GitHub


gharris1727 commented on PR #14156:
URL: https://github.com/apache/kafka/pull/14156#issuecomment-1673861990

   The latest CI run has some unrelated flakiness, and tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14082:
URL: https://github.com/apache/kafka/pull/14082#discussion_r1290641476


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java:
##
@@ -43,6 +47,11 @@ public void configure(Map props) {
 log.info("Using custom remote topic separator: '{}'", separator);
 separatorPattern = Pattern.compile(Pattern.quote(separator));
 }
+
+if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) {
+log.info("Disable the usage of topic separator for internal 
topics");

Review Comment:
   I think the first part of this still needs to be addressed? Specifically, we 
should only emit the log statement if `isInternalTopicSeparatorEnabled` is 
`false`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-10 Thread via GitHub


gharris1727 commented on PR #14156:
URL: https://github.com/apache/kafka/pull/14156#issuecomment-1673859231

   I saw one of the CI builds with testOffsetTranslationBehindReplicationFlow 
failing, so I think there is still another source of flakiness somewhere. I'll 
close the fix JIRA but leave the flaky JIRA open.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-10 Thread via GitHub


jeffkbkim commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1290585581


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *

Review Comment:
   I would include the priorities when considering uniform vs. sticky vs. 
rack-aware



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignor.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
previous owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignor extends UniformAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignor.class);
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final 

[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1290629836


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.
+
+Verifying Plugin 
Compatibility
+
+To verify if all of your plugins are compatible, first ensure that you 
are using version 3.6 or later of the Connect runtime. You can then perform one 
of the following checks:
+
+
+Start your worker with the default 
HYBRID_WARNstrategy, and WARN logs enabled for the 
org.apache.kafka.connect package. At least one WARN log message 
mentioning the plugin.discovery configuration should be printed. 
This log message will explicitly say that all plugins are compatible, or list 
the incompatible plugins.
+Start your worker in a test environment with 
HYBRID_FAIL. If all plugins are compatible, startup will succeed. 
If at least one plugin is not compatible the worker will fail to start up, and 
all incompatible plugins will be listed in the exception.
+
+
+If the verification step succeeds, then your current set of installed 
plugins are compatible, and it should be safe to change the 
plugin.discovery configuration to SERVICE_LOAD. If 
you change the set of already-installed plugins, they may no longer be 
compatible, and you should repeat the above verification. If the verification 
fails, you must address the incompatible plugins before using the 
SERVICE_LOAD strategy.
+
+Operators: Artifact 
Migration
+
+As an operator of Connect, if you discover incompatible plugins, there 
are multiple ways to try to resolve the incompatibility. They are listed below 
from most to least preferable.
+
+
+Upgrade your incompatible plugins to the latest release version 
from your plugin provider.
+Contact your plugin provider and request that they migrate the 
plugin to be compatible, following the source migration 
instructions, and then upgrade to the migrated version.
+Migrate the plugin artifacts yourself using the included migration 
script.
+
+
+The migration script is located in 
bin/connect-plugin-path.sh and 
bin\windows\connect-plugin-path.bat of your Kafka installation. 
The script can migrate incompatible plugin artifacts already installed on your 
Connect worker's plugin.path by adding or modifying JAR or 
resource files. This is not suitable for environments using code-signing, as 
this may change the artifacts such that they will fail signature verification. 
View the built-in help with --help.
+
+To perform a migration, first use the list subcommand to 
get an overview of the plugins available to the script. You must tell the 
script where to find plugins, which can be done with the repeatable 
--worker-config, --plugin-path, and 
--plugin-location arguments. The script will only migrate plugins 
present in the paths specified, so if you add plugins to your worker's 
classpath, then you will need to specify those plugins via one or more 
--plugin-location arguments.
+
+Once you see that all incompatible plugins are included in the listing, 
you can proceed to dry-run the migration with sync-manifests 
--dry-run. This will perform all parts of the migration, except for 
writing the results of the migration to disk. Note that the 
sync-manifests command requires all specified paths to be 
writable, and may alter the contents of the directories. Make a backup of the 
specified paths, or copy them to a writable directory.

Review Comment:
   LGTM, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1290627899


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.
+
+Verifying Plugin 
Compatibility
+
+To verify if all of your plugins are compatible, first ensure that you 
are using version 3.6 or later of the Connect runtime. You can then perform one 
of the following checks:
+
+
+Start your worker with the default 
HYBRID_WARNstrategy, and WARN logs enabled for the 
org.apache.kafka.connect package. At least one WARN log message 
mentioning the plugin.discovery configuration should be printed. 
This log message will explicitly say that all plugins are compatible, or list 
the incompatible plugins.
+Start your worker in a test environment with 
HYBRID_FAIL. If all plugins are compatible, startup will succeed. 
If at least one plugin is not compatible the worker will fail to start up, and 
all incompatible plugins will be listed in the exception.
+
+
+If the verification step succeeds, then your current set of installed 
plugins are compatible, and it should be safe to change the 
plugin.discovery configuration to SERVICE_LOAD. If 
you change the set of already-installed plugins, they may no longer be 
compatible, and you should repeat the above verification. If the verification 
fails, you must address the incompatible plugins before using the 
SERVICE_LOAD strategy.
+
+Operators: Artifact 
Migration
+
+As an operator of Connect, if you discover incompatible plugins, there 
are multiple ways to try to resolve the incompatibility. They are listed below 
from most to least preferable.
+
+
+Upgrade your incompatible plugins to the latest release version 
from your plugin provider.
+Contact your plugin provider and request that they migrate the 
plugin to be compatible, following the source migration 
instructions, and then upgrade to the migrated version.
+Migrate the plugin artifacts yourself using the included migration 
script.
+
+
+The migration script is located in 
bin/connect-plugin-path.sh and 
bin\windows\connect-plugin-path.bat of your Kafka installation. 
The script can migrate incompatible plugin artifacts already installed on your 
Connect worker's plugin.path by adding or modifying JAR or 
resource files. This is not suitable for environments using code-signing, as 
this may change the artifacts such that they will fail signature verification. 
View the built-in help with --help.
+
+To perform a migration, first use the list subcommand to 
get an overview of the plugins available to the script. You must tell the 
script where to find plugins, which can be done with the repeatable 
--worker-config, --plugin-path, and 
--plugin-location arguments. The script will only migrate plugins 
present in the paths specified, so if you add plugins to your worker's 
classpath, then you will need to specify those plugins via one or more 
--plugin-location arguments.
+
+Once you see that all incompatible plugins are included in the listing, 
you can proceed to dry-run the migration with sync-manifests 
--dry-run. This will perform all parts of the migration, except for 
writing the results of the migration to disk. Note that the 
sync-manifests command requires all specified paths to be 
writable, and may alter the contents of the directories. Make a backup of the 
specified paths, or copy them to a writable directory.
+
+Ensure that you have a backup and the dry-run succeeds before removing 
the --dry-run flag and actually running the migration. If the 
migration fails without the --dry-run flag, then the partially 
migrated artifacts should be discarded. The migration is idempotent, so running 
it multiple times and on already-migrated plugins is safe. After the migration 
is completed, you should 

[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1290624547


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.
+
+Verifying Plugin 
Compatibility
+
+To verify if all of your plugins are compatible, first ensure that you 
are using version 3.6 or later of the Connect runtime. You can then perform one 
of the following checks:
+
+
+Start your worker with the default 
HYBRID_WARNstrategy, and WARN logs enabled for the 
org.apache.kafka.connect package. At least one WARN log message 
mentioning the plugin.discovery configuration should be printed. 
This log message will explicitly say that all plugins are compatible, or list 
the incompatible plugins.
+Start your worker in a test environment with 
HYBRID_FAIL. If all plugins are compatible, startup will succeed. 
If at least one plugin is not compatible the worker will fail to start up, and 
all incompatible plugins will be listed in the exception.
+
+
+If the verification step succeeds, then your current set of installed 
plugins are compatible, and it should be safe to change the 
plugin.discovery configuration to SERVICE_LOAD. If 
you change the set of already-installed plugins, they may no longer be 
compatible, and you should repeat the above verification. If the verification 
fails, you must address the incompatible plugins before using the 
SERVICE_LOAD strategy.
+
+Operators: Artifact 
Migration
+
+As an operator of Connect, if you discover incompatible plugins, there 
are multiple ways to try to resolve the incompatibility. They are listed below 
from most to least preferable.
+
+
+Upgrade your incompatible plugins to the latest release version 
from your plugin provider.
+Contact your plugin provider and request that they migrate the 
plugin to be compatible, following the source migration 
instructions, and then upgrade to the migrated version.
+Migrate the plugin artifacts yourself using the included migration 
script.
+
+
+The migration script is located in 
bin/connect-plugin-path.sh and 
bin\windows\connect-plugin-path.bat of your Kafka installation. 
The script can migrate incompatible plugin artifacts already installed on your 
Connect worker's plugin.path by adding or modifying JAR or 
resource files. This is not suitable for environments using code-signing, as 
this may change the artifacts such that they will fail signature verification. 
View the built-in help with --help.
+
+To perform a migration, first use the list subcommand to 
get an overview of the plugins available to the script. You must tell the 
script where to find plugins, which can be done with the repeatable 
--worker-config, --plugin-path, and 
--plugin-location arguments. The script will only migrate plugins 
present in the paths specified, so if you add plugins to your worker's 
classpath, then you will need to specify those plugins via one or more 
--plugin-location arguments.

Review Comment:
   Thanks, wish we didn't have this discrepancy in behavior but I'm convinced 
that you've thoroughly investigated and there aren't any maintainable 
alternatives 



##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-10 Thread via GitHub


gharris1727 commented on code in PR #14064:
URL: https://github.com/apache/kafka/pull/14064#discussion_r1290624348


##
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java:
##
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
+import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
+import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConnectPluginPath {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+public static final Object[] LIST_TABLE_COLUMNS = {
+"pluginName",
+"firstAlias",
+"secondAlias",
+"pluginVersion",
+"pluginType",
+"isLoadable",
+"hasManifest",
+"pluginLocation" // last because it is least important and most 
repetitive
+};
+public static final String NO_ALIAS = "N/A";
+
+public static void main(String[] args) {
+Exit.exit(mainNoExit(args, System.out, System.err));
+}
+
+public static int mainNoExit(String[] args, PrintStream out, PrintStream 
err) {
+ArgumentParser parser = parser();
+try {
+Namespace namespace = parser.parseArgs(args);
+Config config = parseConfig(parser, namespace, out);
+runCommand(config);
+return 0;
+} catch (ArgumentParserException e) {
+parser.handleError(e);
+return 1;
+} catch (TerseException e) {
+err.println(e.getMessage());
+return 2;
+} catch (Throwable e) {
+err.println(e.getMessage());
+err.println(Utils.stackTrace(e));
+return 3;
+}
+}
+
+private static ArgumentParser parser() {
+ArgumentParser parser = 
ArgumentParsers.newArgumentParser("connect-plugin-path")
+.defaultHelp(true)
+.description("Manage plugins on the Connect plugin.path");
+
+ArgumentParser listCommand = parser.addSubparsers()
+.description("List information about plugins contained within the 
specified plugin locations")
+.dest("subcommand")
+.addParser("list");
+
+ArgumentParser[] subparsers = new ArgumentParser[] {
+listCommand,
+};
+
+for 

[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-10 Thread via GitHub


gharris1727 commented on PR #14064:
URL: https://github.com/apache/kafka/pull/14064#issuecomment-1673833354

   > We may choose to report broken plugin locations (i.e., any that has one or 
more unloadable plugins), but IMO it's better to leave this info out entirely. 
We have room to maneuver in the future with this tool if we want (the public 
interface boundaries set by the KIP are very permissive), and I'd rather we err 
on the side of simplicity for now. Total/loadable/compatible plugins seems like 
plenty.
   
   I removed the location summaries and overall summaries. Users will need to 
inspect the table to figure out if everything is compatible, or run a worker 
verification, which is the more reliable way to find incompatibilities anyway.
   
   > I'm also worried about our alias collision reporting. Right now every 
invocation of the command so far has produced a ton of warnings about SMT 
aliases collisiding. It'd be nice if we could use ReplaceField$Key as an alias 
instead of just Key, but until/unless that KIP gets approved, we're stuck with 
a fairly large number of collisions with the aliases for our OOTB transforms.
   
   I think this is the right call, since we already changed it so that the 
aliases in each row of the table doesn't depend on other rows. The alias 
collision logic was a red-herring because I accidentally used the collision 
logic in the original implementation, when it really isn't relevant to the tool 
at all.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] florin-akermann opened a new pull request, #14187: KAFKA-13197: fix GlobalKTable join/left-join semantics documentation.

2023-08-10 Thread via GitHub


florin-akermann opened a new pull request, #14187:
URL: https://github.com/apache/kafka/pull/14187

   The documentation for join/left-join Kstream-GlobalKtable is out dated: The 
left hand side key may be null.
   
   Note that the second statement mentioned in the Jira 
https://issues.apache.org/jira/browse/KAFKA-13197 is no longer present.
   
   Note: The constraint that KeyValueMapper cannot return null for left joins 
or else the record gets dropped will be relaxed as part of 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1290620328


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.

Review Comment:
   LGTM, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290589930


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   I pushed a corresponding update, but I am not super happy with it... I 
believe the original idea to get the list of values from `CONFIG` is safer. 
Curious to hear what others think. In the end I am fine both ways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-10 Thread via GitHub


kirktrue commented on PR #14156:
URL: https://github.com/apache/kafka/pull/14156#issuecomment-1673817790

   @gharris1727 Thanks for your work on this! Hopefully it will help 
KAFKA-15197 too!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13908: KAFKA-15052 Fix the flaky testBalancePartitionLeaders - part II

2023-08-10 Thread via GitHub


kirktrue commented on PR #13908:
URL: https://github.com/apache/kafka/pull/13908#issuecomment-1673815636

   @divijvaidya Can you review this with an eye to merging it? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-10 Thread via GitHub


gharris1727 commented on PR #14092:
URL: https://github.com/apache/kafka/pull/14092#issuecomment-1673813076

   System test failures appear to be due to reasons other than 
classloading/dependency issues. The producer performance tests appear to work 
normally.
   ```
   
tests/kafkatest/tests/core/zookeeper_migration_test.py::TestMigration.test_pre_migration_mode_3_4@{"metadata_quorum":"ISOLATED_KRAFT"}
   
tests/kafkatest/tests/core/zookeeper_migration_test.py::TestMigration.test_upgrade_after_3_4_migration
   
tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_PLAINTEXT"}
   
tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_SSL"}
   
tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"ISOLATED_KRAFT"}
   
tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"ZK"}
   
tests/kafkatest/tests/streams/streams_smoke_test.py::StreamsSmokeTest.test_streams@{"processing_guarantee":"at_least_once","crash":false,"metadata_quorum":"ISOLATED_KRAFT"}
   
tests/kafkatest/tests/core/downgrade_test.py::TestDowngrade.test_upgrade_and_downgrade@{"version":"2.7.1","compression_types":["none"],"static_membership":false}
   
tests/kafkatest/tests/core/downgrade_test.py::TestDowngrade.test_upgrade_and_downgrade@{"version":"2.7.1","compression_types":["none"],"static_membership":true}
   
tests/kafkatest/tests/core/downgrade_test.py::TestDowngrade.test_upgrade_and_downgrade@{"version":"2.7.1","compression_types":["zstd"],"security_protocol":"SASL_SSL"}
   
tests/kafkatest/tests/streams/streams_broker_bounce_test.py::StreamsBrokerBounceTest.test_broker_type_bounce@{"failure_mode":"hard_shutdown","broker_type":"controller","num_threads":1,"sleep_time_secs":120,"metadata_quorum":"ZK"}
   
tests/kafkatest/tests/streams/streams_broker_bounce_test.py::StreamsBrokerBounceTest.test_broker_type_bounce@{"failure_mode":"hard_shutdown","broker_type":"leader","num_threads":1,"sleep_time_secs":120,"metadata_quorum":"ZK"}
   
tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces@{"from_version":"3.1.2","to_version":"3.6.0-SNAPSHOT"}
   
tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces@{"from_version":"3.2.3","to_version":"3.6.0-SNAPSHOT"}
   
tests/kafkatest/tests/tools/replica_verification_test.py::ReplicaVerificationToolTest.test_replica_lags@{"metadata_quorum":"ISOLATED_KRAFT"}
   
tests/kafkatest/tests/tools/replica_verification_test.py::ReplicaVerificationToolTest.test_replica_lags@{"metadata_quorum":"ZK"}
   
tests/kafkatest/tests/core/network_degrade_test.py::NetworkDegradeTest.test_rate@{"task_name":"rate-1000-latency-50","device_name":"eth0","latency_ms":50,"rate_limit_kbit":100}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-10 Thread via GitHub


C0urante commented on code in PR #14064:
URL: https://github.com/apache/kafka/pull/14064#discussion_r1290570863


##
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java:
##
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
+import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
+import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConnectPluginPath {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+public static final Object[] LIST_TABLE_COLUMNS = {
+"pluginName",
+"firstAlias",
+"secondAlias",
+"pluginVersion",
+"pluginType",
+"isLoadable",
+"hasManifest",
+"pluginLocation" // last because it is least important and most 
repetitive
+};
+public static final String NO_ALIAS = "N/A";
+
+public static void main(String[] args) {
+Exit.exit(mainNoExit(args, System.out, System.err));
+}
+
+public static int mainNoExit(String[] args, PrintStream out, PrintStream 
err) {
+ArgumentParser parser = parser();
+try {
+Namespace namespace = parser.parseArgs(args);
+Config config = parseConfig(parser, namespace, out);
+runCommand(config);
+return 0;
+} catch (ArgumentParserException e) {
+parser.handleError(e);
+return 1;
+} catch (TerseException e) {
+err.println(e.getMessage());
+return 2;
+} catch (Throwable e) {
+err.println(e.getMessage());
+err.println(Utils.stackTrace(e));
+return 3;
+}
+}
+
+private static ArgumentParser parser() {
+ArgumentParser parser = 
ArgumentParsers.newArgumentParser("connect-plugin-path")
+.defaultHelp(true)
+.description("Manage plugins on the Connect plugin.path");
+
+ArgumentParser listCommand = parser.addSubparsers()
+.description("List information about plugins contained within the 
specified plugin locations")
+.dest("subcommand")
+.addParser("list");
+
+ArgumentParser[] subparsers = new ArgumentParser[] {
+listCommand,
+};
+
+for (ArgumentParser 

[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   This part does not work: `public static final String UPGRADE_FROM_34 = 
UpgradeFromValues.UPGRADE_FROM_34.toString();` because we use
   ```
   switch ((String) upgradeFrom) {
   case StreamsConfig.UPGRADE_FROM_0100:
   ...
   }
   ```
   
   And the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a 
const
   ```
   error: constant string expression required
   case StreamsConfig.UPGRADE_FROM_0100:
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   This part does not work: `public static final String UPGRADE_FROM_34 = 
UpgradeFromValues.UPGRADE_FROM_34.toString();`
   
   We use
   ```
   switch ((String) upgradeFrom) {
   case StreamsConfig.UPGRADE_FROM_0100:
   ...
   }
   ```
   and the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a 
const
   ```
   error: constant string expression required
   case StreamsConfig.UPGRADE_FROM_0100:
   ```
   
   So not sure if we gain much adding the enum? -- We would still need to 
remember update the enum, and thus have a "split brain" problem -- getting the 
List out of the `CONFIG` object seems to be safer.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   This part does not work: `public static final String UPGRADE_FROM_34 = 
UpgradeFromValues.UPGRADE_FROM_34.toString();` because we use
   ```
   switch ((String) upgradeFrom) {
   case StreamsConfig.UPGRADE_FROM_0100:
   ...
   }
   ```
   
   And the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a 
const
   ```
   error: constant string expression required
   case StreamsConfig.UPGRADE_FROM_0100:
   ```
   
   So not sure if we gain much adding the enum? -- We would still need to 
remember update the enum, and thus have a "split brain" problem -- getting the 
List out of the `CONFIG` object seems to be safer.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290575646


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   This part does not work: `public static final String UPGRADE_FROM_34 = 
UpgradeFromValues.UPGRADE_FROM_34.toString();` because we use
   ```
   switch ((String) upgradeFrom) {
   case StreamsConfig.UPGRADE_FROM_0100:
   ...
   }
   ```
   
   And the compiler complain that `StreamsConfig.UPGRADE_FROM_0100` must be a 
const
   ```
   error: constant string expression required
   case StreamsConfig.UPGRADE_FROM_0100:
   ```
   
   So not sure if we gain much adding the enum?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290555860


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   This does not really help, because in the end we need a `List` of config 
values -- the source of truth what config value we support is inside the 
`CONFIG` object, so we need to get it from there IMHO.
   
   If we don't want to tap into the `CONFIG` object, we can maintain a list 
ourselves (but we don't need an `enum` for it), but I think it defeats the 
purpose of the test, as we again have a "split brain" problem for which we need 
to keep the `CONFIG` and our manually maintained list in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290555860


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   This does not really help, because in the end we need a `List` of config 
values -- the source of truth what config value we support is inside the 
`CONFIG` object, so we need to get it from there IMHO.
   
   If we don't want to tap into the `CONFIG` object, we can maintain a list 
ourselves, but I think it defeats the purpose of the test, as we again have a 
"split brain" problem for which we need to keep the `CONFIG` and our manually 
maintained list in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14103: HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14103:
URL: https://github.com/apache/kafka/pull/14103#discussion_r1290555860


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -947,9 +947,15 @@ public String toString() {
 }
 
 public static class ValidString implements Validator {
-final List validStrings;
+// visible for testing

Review Comment:
   This does not really help, because in the end we need a `List` of config 
values -- the source of truth what config value we support is inside the 
`CONFIG` object, so we need to get it from there IMHO.
   
   If we don't want to tap into the `CONFIG` object, we can maintain a list 
ourselves (but we don't need an `enum` for it), but I think it defeats the 
purpose of the test, as we again have a "split brain" problem for which we need 
to keep the `CONFIG` and our manually maintained list in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15333) Flaky build failure throwing Connect Exception: Could not connect to server....

2023-08-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15333:
--

 Summary: Flaky build failure throwing Connect Exception: Could not 
connect to server
 Key: KAFKA-15333
 URL: https://issues.apache.org/jira/browse/KAFKA-15333
 Project: Kafka
  Issue Type: Test
  Components: connect, unit tests
Reporter: Philip Nee


We frequently observe flaky build failure with the following message.  The is 
from the most recent PR post 3.5.0:

 
{code:java}
> Task :generator:testClasses UP-TO-DATE
Unexpected exception thrown.
org.gradle.internal.remote.internal.MessageIOException: Could not read message 
from '/127.0.0.1:38354'.
at 
org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:94)
at 
org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at 
org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:47)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException
at 
org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:72)
at 
org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52)
at 
org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:81)
... 6 more

> Task :streams:upgrade-system-tests-26:unitTest
org.gradle.internal.remote.internal.ConnectException: Could not connect to 
server [3156f144-9a89-4c47-91ad-88a8378ec726 port:37889, 
addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1].
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:122)
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81)
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54)
... 5 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #14138: Fix a race when query isUnderMinIsr

2023-08-10 Thread via GitHub


jolshan commented on code in PR #14138:
URL: https://github.com/apache/kafka/pull/14138#discussion_r1290546194


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -798,6 +803,12 @@ class Partition(val topicPartition: TopicPartition,
   // to maintain the decision maker controller's epoch in the zookeeper 
path
   controllerEpoch = partitionState.controllerEpoch
 
+  val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch

Review Comment:
   Can we add a bit more detail in the description about why we moved this to 
be before updateAssignmentAndIsr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build

2023-08-10 Thread via GitHub


C0urante commented on PR #14186:
URL: https://github.com/apache/kafka/pull/14186#issuecomment-1673671924

   I've opened this as a draft request to demonstrate that unused stubbings 
will be successfully reported with the proposed change to the Jenkinsfile. Once 
the build fails successfully (), I'll revert the commit that induces failure 
and mark this ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290483692


##
docs/streams/developer-guide/config-streams.html:
##
@@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+rack.aware.assignment.non_overlap_cost
+
+  
+
+  This configuration sets the cost of moving a task from the 
original assignment computed either by StickyTaskAssignor or
+  HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost,
+  they control whether the optimizer favors minimizing cross rack 
traffic or minimizing the movement of tasks in the existing assignment. If this 
config is set to a larger value than rack.aware.assignment.traffic_cost,
+  the optimizer will try to maintain the existing assignment 
computed by the task assignor. Note that the optimizer takes the ratio of these 
two configs into consideration of favoring maintaining existing assignment or 
minimizing traffic cost. For example, setting
+  rack.aware.assignment.non_overlap_cost to 10 and 
rack.aware.assignment.traffic_cost to 1 is more 
likely to maintain existing assignment than setting
+  rack.aware.assignment.non_overlap_cost to 100 and 
rack.aware.assignment.traffic_cost to 50.
+
+
+  The default value is null which means default non_overlap_cost 
in different assignors will be used. In StickyTaskAssignor, it has a higher default value 
than rack.aware.assignment.traffic_cost which means
+  maintaining stickiness is preferred in StickyTaskAssignor. In HighAvailabilityTaskAssignor, it has a lower default 
value than rack.aware.assignment.traffic_cost

Review Comment:
   > it has a lower default value
   
   As above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #14050: KAFKA-15220: Do not returned fenced brokers from getAliveBrokerNode

2023-08-10 Thread via GitHub


cmccabe merged PR #14050:
URL: https://github.com/apache/kafka/pull/14050


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290482358


##
docs/streams/developer-guide/config-streams.html:
##
@@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+rack.aware.assignment.non_overlap_cost
+
+  
+
+  This configuration sets the cost of moving a task from the 
original assignment computed either by StickyTaskAssignor or
+  HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost,
+  they control whether the optimizer favors minimizing cross rack 
traffic or minimizing the movement of tasks in the existing assignment. If this 
config is set to a larger value than rack.aware.assignment.traffic_cost,
+  the optimizer will try to maintain the existing assignment 
computed by the task assignor. Note that the optimizer takes the ratio of these 
two configs into consideration of favoring maintaining existing assignment or 
minimizing traffic cost. For example, setting
+  rack.aware.assignment.non_overlap_cost to 10 and 
rack.aware.assignment.traffic_cost to 1 is more 
likely to maintain existing assignment than setting
+  rack.aware.assignment.non_overlap_cost to 100 and 
rack.aware.assignment.traffic_cost to 50.
+
+
+  The default value is null which means default non_overlap_cost 
in different assignors will be used. In StickyTaskAssignor, it has a higher default value 
than rack.aware.assignment.traffic_cost which means

Review Comment:
   put `non_overlap_cost` into code markup



##
docs/streams/architecture.html:
##
@@ -167,6 +167,14 @@ Kafka
 Streams Developer Guide section.
 
+
+There is also a client side config client.rack which can 
set the rack for a Kafka Consumer. If broker side also have rack set via 
broker.rack. Then rack aware task

Review Comment:
   ```suggestion
   There is also a client config client.rack which can set 
the rack for a Kafka consumer. If brokers also have their rack set via 
broker.rack, then rack aware task
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #14186: KAFKA-14682: Report Mockito unused stubbings during Jenkins build

2023-08-10 Thread via GitHub


C0urante opened a new pull request, #14186:
URL: https://github.com/apache/kafka/pull/14186

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14682)
   
   Our Jenkinsfile is currently configured to do testing with the `unitTest` 
and `integrationTest` tasks, instead of the `test` task. These tasks use JUnit 
test category filters to distinguish between integration and non-integration 
tests. Because of https://github.com/mockito/mockito/issues/3077, if a JUnit 
test category filter is used, Mockito does not report unused stubbings. As a 
result, our CI builds currently don't catch unused stubbings.
   
   Although an upstream fix has been implemented with 
https://github.com/mockito/mockito/pull/3078, a release has not yet been 
published with that fix. Even when a new release is published, it will take 
place on the 5.x.y line, which requires JDK 11+ (see the [5.0.0 release 
notes](https://github.com/mockito/mockito/releases/tag/v5.0.0)).
   
   As a workaround, we can switch to using the `test` task instead of the 
`unitTest` and `integrationTest` tasks. If there are benefits to the two-task 
approach, we can revert to that approach once we begin work on Kafka 4.0 (which 
drops support for Java 8) and upgrade to a version of Mockito with the fix.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15332) Eligible Leader Replicas

2023-08-10 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15332:
--

 Summary: Eligible Leader Replicas
 Key: KAFKA-15332
 URL: https://issues.apache.org/jira/browse/KAFKA-15332
 Project: Kafka
  Issue Type: New Feature
Reporter: Calvin Liu
Assignee: Calvin Liu


A root ticket for the KIP-966



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290485321


##
docs/streams/developer-guide/config-streams.html:
##
@@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+rack.aware.assignment.non_overlap_cost
+
+  
+
+  This configuration sets the cost of moving a task from the 
original assignment computed either by StickyTaskAssignor or
+  HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost,
+  they control whether the optimizer favors minimizing cross rack 
traffic or minimizing the movement of tasks in the existing assignment. If this 
config is set to a larger value than rack.aware.assignment.traffic_cost,
+  the optimizer will try to maintain the existing assignment 
computed by the task assignor. Note that the optimizer takes the ratio of these 
two configs into consideration of favoring maintaining existing assignment or 
minimizing traffic cost. For example, setting
+  rack.aware.assignment.non_overlap_cost to 10 and 
rack.aware.assignment.traffic_cost to 1 is more 
likely to maintain existing assignment than setting
+  rack.aware.assignment.non_overlap_cost to 100 and 
rack.aware.assignment.traffic_cost to 50.
+
+
+  The default value is null which means default non_overlap_cost 
in different assignors will be used. In StickyTaskAssignor, it has a higher default value 
than rack.aware.assignment.traffic_cost which means
+  maintaining stickiness is preferred in StickyTaskAssignor. In HighAvailabilityTaskAssignor, it has a lower default 
value than rack.aware.assignment.traffic_cost
+  which means minimizing cross rack traffic is preferred in HighAvailabilityTaskAssignor.
+
+  
+
+  
+  
+rack.aware.assignment.strategy
+
+  
+
+  This configuration sets the strategy Kafka Streams can use for 
rack aware task assignment so that cross traffic from broker to client can be 
reduced. This config will only take effect when broker.rack

Review Comment:
   ```suggestion
 This configuration sets the strategy Kafka Streams uses for 
rack aware task assignment so that cross traffic from broker to client can be 
reduced. This config will only take effect when broker.rack
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290489521


##
docs/streams/developer-guide/config-streams.html:
##
@@ -718,6 +760,24 @@ rack.aware.assignment.tags
+rack.aware.assignment.traffic_cost
+
+  
+
+  This configuration sets the cost of cross rack traffic. Together 
with rack.aware.assignment.non_overlap_cost,
+  they control whether the optimizer favors minimizing cross rack 
traffic or minimizing the movement of tasks in the existing assignment. If this 
config is set to a larger value than rack.aware.assignment.non_overlap_cost,
+  the optimizer will try to compute an assignment which minimize 
the cross rack traffic. Note that the optimizer takes the ratio of these two 
configs into consideration of favoring maintaining existing assignment or 
minimizing traffic cost. For example, setting
+  rack.aware.assignment.traffic_cost to 10 and rack.aware.assignment.non_overlap_cost to 1 is more 
likely to minimize cross rack traffic than setting
+  rack.aware.assignment.traffic_cost to 100 and rack.aware.assignment.non_overlap_cost to 50.
+
+
+  The default value is null which means default traffic cost in 
different assignors will be used. In StickyTaskAssignor, it has a lower default value than 
rack.aware.assignment.non_overlap_cost.
+  In HighAvailabilityTaskAssignor, it has a higher default 
value than rack.aware.assignment.non_overlap_cost.

Review Comment:
   Include default value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290483313


##
docs/streams/developer-guide/config-streams.html:
##
@@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+rack.aware.assignment.non_overlap_cost
+
+  
+
+  This configuration sets the cost of moving a task from the 
original assignment computed either by StickyTaskAssignor or
+  HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost,
+  they control whether the optimizer favors minimizing cross rack 
traffic or minimizing the movement of tasks in the existing assignment. If this 
config is set to a larger value than rack.aware.assignment.traffic_cost,
+  the optimizer will try to maintain the existing assignment 
computed by the task assignor. Note that the optimizer takes the ratio of these 
two configs into consideration of favoring maintaining existing assignment or 
minimizing traffic cost. For example, setting
+  rack.aware.assignment.non_overlap_cost to 10 and 
rack.aware.assignment.traffic_cost to 1 is more 
likely to maintain existing assignment than setting
+  rack.aware.assignment.non_overlap_cost to 100 and 
rack.aware.assignment.traffic_cost to 50.
+
+
+  The default value is null which means default non_overlap_cost 
in different assignors will be used. In StickyTaskAssignor, it has a higher default value 
than rack.aware.assignment.traffic_cost which means

Review Comment:
   > it has a higher default value
   
   Should we include the value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290445751


##
docs/streams/architecture.html:
##
@@ -167,6 +167,14 @@ Kafka
 Streams Developer Guide section.
 
+
+There is also a client side config client.rack which can 
set the rack for a Kafka Consumer. If broker side also have rack set via 
broker.rack. Then rack aware task
+assignment can be enabled to compute a task assignment which can 
reduce cross rack traffic by try to assign tasks to clients with the same rack. 
See rack.aware.assignment.strategy in
+Kafka
 Streams Developer Guide.
+Note that client.rack can also be used to distribute 
standby tasks on different "rack" from the active ones, which has a similar 
functionality as rack.aware.assignment.tags.
+Currently, rack.aware.assignment.tag takes precedence in 
distributing standby tasks which means if both configs present, 
rack.aware.assignment.tag will be used for distributing
+standby tasks on different "rack" from the active ones because it can 
configure more tag keys.

Review Comment:
   ```suggestion
   standby tasks on different racks from the active ones because it can 
configure more tag keys.
   ```



##
docs/streams/architecture.html:
##
@@ -167,6 +167,14 @@ Kafka
 Streams Developer Guide section.
 
+
+There is also a client side config client.rack which can 
set the rack for a Kafka Consumer. If broker side also have rack set via 
broker.rack. Then rack aware task
+assignment can be enabled to compute a task assignment which can 
reduce cross rack traffic by try to assign tasks to clients with the same rack. 
See rack.aware.assignment.strategy in
+Kafka
 Streams Developer Guide.

Review Comment:
   Drop this line



##
docs/streams/architecture.html:
##
@@ -167,6 +167,14 @@ Kafka
 Streams Developer Guide section.
 
+
+There is also a client side config client.rack which can 
set the rack for a Kafka Consumer. If broker side also have rack set via 
broker.rack. Then rack aware task
+assignment can be enabled to compute a task assignment which can 
reduce cross rack traffic by try to assign tasks to clients with the same rack. 
See rack.aware.assignment.strategy in

Review Comment:
   ```suggestion
   assignment can be enabled via 
rack.aware.assignment.strategy (cf.  Kafka
 Streams Developer Guide) to compute a task assignment which can reduce 
cross rack traffic by trying to assign tasks to clients with the same rack.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290489338


##
docs/streams/developer-guide/config-streams.html:
##
@@ -718,6 +760,24 @@ rack.aware.assignment.tags
+rack.aware.assignment.traffic_cost
+
+  
+
+  This configuration sets the cost of cross rack traffic. Together 
with rack.aware.assignment.non_overlap_cost,
+  they control whether the optimizer favors minimizing cross rack 
traffic or minimizing the movement of tasks in the existing assignment. If this 
config is set to a larger value than rack.aware.assignment.non_overlap_cost,
+  the optimizer will try to compute an assignment which minimize 
the cross rack traffic. Note that the optimizer takes the ratio of these two 
configs into consideration of favoring maintaining existing assignment or 
minimizing traffic cost. For example, setting
+  rack.aware.assignment.traffic_cost to 10 and rack.aware.assignment.non_overlap_cost to 1 is more 
likely to minimize cross rack traffic than setting
+  rack.aware.assignment.traffic_cost to 100 and rack.aware.assignment.non_overlap_cost to 50.
+
+
+  The default value is null which means default traffic cost in 
different assignors will be used. In StickyTaskAssignor, it has a lower default value than 
rack.aware.assignment.non_overlap_cost.

Review Comment:
   As above: include default value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290445388


##
docs/streams/architecture.html:
##
@@ -167,6 +167,14 @@ Kafka
 Streams Developer Guide section.
 
+
+There is also a client side config client.rack which can 
set the rack for a Kafka Consumer. If broker side also have rack set via 
broker.rack. Then rack aware task
+assignment can be enabled to compute a task assignment which can 
reduce cross rack traffic by try to assign tasks to clients with the same rack. 
See rack.aware.assignment.strategy in
+Kafka
 Streams Developer Guide.
+Note that client.rack can also be used to distribute 
standby tasks on different "rack" from the active ones, which has a similar 
functionality as rack.aware.assignment.tags.

Review Comment:
   ```suggestion
   Note that client.rack can also be used to distribute 
standby tasks to different racks from the active ones, which has a similar 
functionality as rack.aware.assignment.tags.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-10 Thread via GitHub


mjsax commented on code in PR #14181:
URL: https://github.com/apache/kafka/pull/14181#discussion_r1290486200


##
docs/streams/developer-guide/config-streams.html:
##
@@ -685,6 +688,45 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+rack.aware.assignment.non_overlap_cost
+
+  
+
+  This configuration sets the cost of moving a task from the 
original assignment computed either by StickyTaskAssignor or
+  HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost,
+  they control whether the optimizer favors minimizing cross rack 
traffic or minimizing the movement of tasks in the existing assignment. If this 
config is set to a larger value than rack.aware.assignment.traffic_cost,
+  the optimizer will try to maintain the existing assignment 
computed by the task assignor. Note that the optimizer takes the ratio of these 
two configs into consideration of favoring maintaining existing assignment or 
minimizing traffic cost. For example, setting
+  rack.aware.assignment.non_overlap_cost to 10 and 
rack.aware.assignment.traffic_cost to 1 is more 
likely to maintain existing assignment than setting
+  rack.aware.assignment.non_overlap_cost to 100 and 
rack.aware.assignment.traffic_cost to 50.
+
+
+  The default value is null which means default non_overlap_cost 
in different assignors will be used. In StickyTaskAssignor, it has a higher default value 
than rack.aware.assignment.traffic_cost which means
+  maintaining stickiness is preferred in StickyTaskAssignor. In HighAvailabilityTaskAssignor, it has a lower default 
value than rack.aware.assignment.traffic_cost
+  which means minimizing cross rack traffic is preferred in HighAvailabilityTaskAssignor.
+
+  
+
+  
+  
+rack.aware.assignment.strategy
+
+  
+
+  This configuration sets the strategy Kafka Streams can use for 
rack aware task assignment so that cross traffic from broker to client can be 
reduced. This config will only take effect when broker.rack
+  is set on broker side and client.rack is set on Kafka Streams side. There are 
two settings for this config:

Review Comment:
   > is set on broker side
   
   Either "is set on the brokers" or "is set broker side"
   
   Also for `is set on Kafka Streams side`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-10 Thread via GitHub


gharris1727 merged PR #14159:
URL: https://github.com/apache/kafka/pull/14159


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15291) Implement Versioned interfaces in common Connect plugins

2023-08-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15291.
-
Resolution: Fixed

> Implement Versioned interfaces in common Connect plugins
> 
>
> Key: KAFKA-15291
> URL: https://issues.apache.org/jira/browse/KAFKA-15291
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Aindriú Lavelle
>Priority: Major
> Fix For: 3.6.0
>
>
> In KAFKA-14863, we changed the plugin scanning logic to allow plugins to 
> opt-in to the Versioned interface individually, when previously it was 
> limited to Connector plugins.
> To take advantage of this change, we should have all of the plugins built via 
> the Kafka repository opt-in, and provide the environment's Kafka version from 
> the AppInfoParser.getVersion().
> See the FileStreamSinkConnector as an example of the the version() method 
> implementation.
> All subclasses of Converter, HeaderConverter, Transformation, Predicate, and 
> ConnectorClientConfigOverridePolicy should implement Versioned. The 
> interfaces themselves will _not_ extend Versioned, as that would be a 
> backwards-incompatible change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-10 Thread via GitHub


gharris1727 commented on PR #14159:
URL: https://github.com/apache/kafka/pull/14159#issuecomment-1673635408

   Flaky test failures appear unrelated, and tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-10 Thread via GitHub


mimaison commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1290277580


##
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+private final ClusterInstance cluster;
+int broker1 = 0;
+int broker2 = 1;
+int broker3 = 2;
+
+public LeaderElectionCommandTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@BeforeEach
+void setup(ClusterConfig clusterConfig) {
+TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+
clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(),
 "false");
+
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(),
 "true");
+
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(),
 "1");
+
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(),
 "1000");
+
clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(),
 "2");
+}
+
+@ClusterTest
+public void testAllTopicPartition() throws InterruptedException, 
ExecutionException {
+String topic = "unclean-topic";
+int partition = 0;
+List assignment = Arrays.asList(broker2, broker3);
+
+cluster.waitForReadyBrokers();
+Admin client = cluster.createAdminClient();
+Map> partitionAssignment = new HashMap<>();
+partitionAssignment.put(partition, assignment);
+
+createTopic(client, topic, partitionAssignment);
+
+TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+TestUtils.assertLeader(client, topicPartition, broker2);
+cluster.shutdownBroker(broker3);
+TestUtils.waitForBrokersOutOfIsr(client,
+
JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+
JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+);
+cluster.shutdownBroker(broker2);
+TestUtils.assertNoLeader(client, topicPartition);
+cluster.startBroker(broker3);
+TestUtils.waitForOnlineBroker(client, broker3);
+
+LeaderElectionCommand.main(
+new String[] {

Review Comment:
   Since `main()` takes a varargs we don't need to create an array here.
   Same below



##
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache 

[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-10 Thread via GitHub


kamalcph commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1290377829


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 /**
  * Deletes the internal topic partition info if delete flag is set as true.
  *
- * @param topicPartition topic partition to be stopped.
+ * @param topicPartitions topic partitions that needs to be stopped.
  * @param delete flag to indicate whether the given topic 
partitions to be deleted or not.
  */
-public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+public void stopPartitions(Set topicPartitions,
+   boolean delete,
+   BiConsumer 
errorHandler) {
+LOGGER.debug("Stopping {} partitions, delete: {}", 
topicPartitions.size(), delete);
+Set topicIdPartitions = topicPartitions.stream()
+.filter(topicIdByPartitionMap::containsKey)
+.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), 
tp))
+.collect(Collectors.toSet());
+
+topicIdPartitions.forEach(tpId -> {
+try {
+RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+if (task != null) {
+LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+task.cancel();
+}
+if (delete) {
+LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
+deleteRemoteLogPartition(tpId);
+}
+} catch (Exception ex) {
+errorHandler.accept(tpId.topicPartition(), ex);
+}
+});
+
 if (delete) {
-// Delete from internal datastructures only if it is to be deleted.
-Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
-LOGGER.debug("Removed partition: {} from topicPartitionIds", 
topicIdPartition);
+// NOTE: this#stopPartitions method is called when Replica state 
changes to Offline and ReplicaDeletionStarted
+remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
+topicPartitions.forEach(topicIdByPartitionMap::remove);
+}
+}
+
+private void deleteRemoteLogPartition(TopicIdPartition partition) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+List metadataList = new ArrayList<>();
+
remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add);
+
+List deleteSegmentStartedEvents = 
metadataList.stream()
+.map(metadata ->
+new 
RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), 
time.milliseconds(),
+RemoteLogSegmentState.DELETE_SEGMENT_STARTED, 
brokerId))
+.collect(Collectors.toList());
+publishEvents(deleteSegmentStartedEvents).get();
+
+// KAFKA-15166: Instead of deleting the segment one by one. If the 
underlying RemoteStorageManager supports deleting
+// a partition, then we can call that API instead.

Review Comment:
   Done. Updated the comment to track the KAFKA-15313 ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-10 Thread via GitHub


kamalcph commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1290376710


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 /**
  * Deletes the internal topic partition info if delete flag is set as true.
  *
- * @param topicPartition topic partition to be stopped.
+ * @param topicPartitions topic partitions that needs to be stopped.
  * @param delete flag to indicate whether the given topic 
partitions to be deleted or not.
  */
-public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+public void stopPartitions(Set topicPartitions,

Review Comment:
   RemoteIndexCache does not maintains the indexes by partition instead by 
segment-uuid. Traversing the cache to remove those indexes will be expensive. 
Since, the `RemoteIndexCache` is a LRU cache, those indexes will be expired 
eventually. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-10 Thread via GitHub


kamalcph commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1290369199


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 /**
  * Deletes the internal topic partition info if delete flag is set as true.
  *
- * @param topicPartition topic partition to be stopped.
+ * @param topicPartitions topic partitions that needs to be stopped.
  * @param delete flag to indicate whether the given topic 
partitions to be deleted or not.
  */
-public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+public void stopPartitions(Set topicPartitions,
+   boolean delete,
+   BiConsumer 
errorHandler) {
+LOGGER.debug("Stopping {} partitions, delete: {}", 
topicPartitions.size(), delete);
+Set topicIdPartitions = topicPartitions.stream()
+.filter(topicIdByPartitionMap::containsKey)
+.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), 
tp))
+.collect(Collectors.toSet());
+
+topicIdPartitions.forEach(tpId -> {
+try {
+RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+if (task != null) {
+LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+task.cancel();
+}
+if (delete) {
+LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
+deleteRemoteLogPartition(tpId);
+}
+} catch (Exception ex) {
+errorHandler.accept(tpId.topicPartition(), ex);
+}
+});
+
 if (delete) {
-// Delete from internal datastructures only if it is to be deleted.
-Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
-LOGGER.debug("Removed partition: {} from topicPartitionIds", 
topicIdPartition);
+// NOTE: this#stopPartitions method is called when Replica state 
changes to Offline and ReplicaDeletionStarted
+remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
+topicPartitions.forEach(topicIdByPartitionMap::remove);
+}
+}
+
+private void deleteRemoteLogPartition(TopicIdPartition partition) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+List metadataList = new ArrayList<>();
+
remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add);
+
+List deleteSegmentStartedEvents = 
metadataList.stream()
+.map(metadata ->
+new 
RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), 
time.milliseconds(),
+RemoteLogSegmentState.DELETE_SEGMENT_STARTED, 
brokerId))
+.collect(Collectors.toList());
+publishEvents(deleteSegmentStartedEvents).get();
+
+// KAFKA-15166: Instead of deleting the segment one by one. If the 
underlying RemoteStorageManager supports deleting
+// a partition, then we can call that API instead.
+for (RemoteLogSegmentMetadata metadata: metadataList) {
+remoteLogStorageManager.deleteLogSegmentData(metadata);

Review Comment:
   It's already mentioned in the javadoc so not repeating the same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-10 Thread via GitHub


kamalcph commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1290366372


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
 }
 partitionsToDelete += topicPartition
   }
+  if (stopPartition.deleteRemoteLog)
+remotePartitionsToDelete += topicPartition
+
   // If we were the leader, we may have some operations still waiting for 
completion.
   // We force completion to prevent them from timing out.
   completeDelayedFetchOrProduceRequests(topicPartition)
 }
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {

Review Comment:
   Removed the `remoteStorageErrorHandler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >