Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663537165


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -730,7 +922,20 @@ private void assertAssignment(
 assertEquals(expectedAssignment.size(), 
computedGroupAssignment.members().size());
 for (String memberId : computedGroupAssignment.members().keySet()) {
 Map> computedAssignmentForMember = 
computedGroupAssignment.members().get(memberId).partitions();
-assertEquals(expectedAssignment.get(memberId), 
computedAssignmentForMember);
+Map> expectedAssignmentForMember = 
expectedAssignment.get(memberId);
+
+// Compare the content of the maps by entries.
+assertEquals(expectedAssignmentForMember.size(), 
computedAssignmentForMember.size());
+for (Map.Entry> entry : 
expectedAssignmentForMember.entrySet()) {
+Set expectedSet = entry.getValue();
+Set computedSet = 
computedAssignmentForMember.get(entry.getKey());
+
+// Convert both sets to HashSet for comparison.
+Set normalizedExpectedSet = new 
HashSet<>(expectedSet);
+Set normalizedComputedSet = new 
HashSet<>(computedSet);
+
+assertEquals(normalizedExpectedSet, normalizedComputedSet);
+}

Review Comment:
   yeah, I think it didn't work initially, but changed it now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17064) New consumer assign should update assignment in background thread

2024-07-02 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-17064:
-

Assignee: PoAn Yang

> New consumer assign should update assignment in background thread
> -
>
> Key: KAFKA-17064
> URL: https://issues.apache.org/jira/browse/KAFKA-17064
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> With the new async consumer, the subscriptionState object is shared between 
> the app thread and the background thread, but in principle all updates to the 
> assignment should happen in the background thread, to avoid race conditions. 
> Note that it's in the background where most updates to the assignment happen, 
> as result of app event processing like unsubscribe, reconciliations, etc.). 
> We've faced such races in places like unsubscribe and close, fixed by 
> ensuring that all assignment updates happen in the background, and this also 
> needs to be reviewed for the consumer.assign. The current implementation 
> triggers an AssignmentChange event that is processed in the background, but 
> that event is not really changing the assignment. It only commits offsets, 
> and the assignment is updated in the app thread by calling 
> subscriptionState.assignFromUser 
> We should consider moving the assignment update to the background thread, as 
> part of the AssignmentChangeEvent.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-07-02 Thread via GitHub


yashmayya commented on code in PR #15302:
URL: https://github.com/apache/kafka/pull/15302#discussion_r1663466038


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -976,27 +988,53 @@ private String 
modifySinkConnectorOffsetsWithRetry(ConnectorOffsets offsetsToAlt
  *
  * @param connectorName the name of the sink connector whose offsets are 
to be verified
  * @param expectedTopic the name of the Kafka topic that the sink 
connector is consuming from
- * @param expectedPartitions the number of partitions that exist for the 
Kafka topic
+ * @param expectedPartitions the number of partitions that exist for the 
Kafka<<< topic

Review Comment:
   Stray change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16918: TestUtils#assertFutureThrows should use future.get with timeout [kafka]

2024-07-02 Thread via GitHub


showuon commented on code in PR #16264:
URL: https://github.com/apache/kafka/pull/16264#discussion_r1663413976


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -558,14 +558,13 @@ public static Set 
generateRandomTopicPartitions(int numTopic, in
  */
 public static  T assertFutureThrows(Future future, 
Class exceptionCauseClass) {
 try {
-future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS);
+future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.MILLISECONDS);
 fail("Future should throw expected exception " + 
exceptionCauseClass.getSimpleName() + " but succeed.");
 } catch (TimeoutException | InterruptedException | ExecutionException 
e) {
-assertInstanceOf(exceptionCauseClass, e.getCause(),
-"Unexpected exception cause " + e.getCause());
+assertInstanceOf(exceptionCauseClass, e.getCause(), "Expected a" + 
exceptionCauseClass.getSimpleName() + "but got" + e.getCause());
 return exceptionCauseClass.cast(e.getCause());
 }
-return null;
+throw new RuntimeException("Future should throw expected exception but 
unexpected error happened.");

Review Comment:
   I saw you addressed 2 of my 3 comments:
   
   > 1. DEFAULT_MAX_WAIT_MS is a millisecond time unit, not seconds. Same 
comment applied to L583.
   
   It's fixed now. Thanks.
   > 2. Now, if the future throws TimeoutException or InterruptedException, it 
will fail as expected. But what error message will we get? In your opinion, do 
you think that's helpful for troubleshooting?
   
   Could you first let me know what error message we will get when 
`TimeoutException or InterruptedException` thrown?
   
   > 3. What happened if the future throws CancellationException? return null? 
Is there any better way to handle it?
   
   Now, it will throw RuntimeException, which works, too. But I think we can 
directly `fail` the test, like L 562 did.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   I'm not sure we would ever be able to achieve this path we're mocking here 
with a real consumer, so wondering if this is even a valid scenario to test? 
(this test, and the ones below for fenced and stale too).  
   
   A member in fatal/fenced/stale state means it was part of a group (automatic 
assignment after subscribe), and then we're mocking a manual assignment that 
takes effect and is cleared on leaveGroup. I expect that would never be 
possible because the consumer does not allow to mix the subscription types, so 
the call to assign would fail before actually updating the assignment and 
numAssignedPartitions (a consumer would have to unsubscribe or assign with 
empty to then be able to manually assign the topic0 partition) 
   
   Just for the record, I totally see the value in the 
`testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and 
it's a valid combination since it's not mixing subscription types, but I 
struggle to see this call to assign happening when a member is 
fatal/fenced/stale?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on PR #16449:
URL: https://github.com/apache/kafka/pull/16449#issuecomment-2204975848

   Hey @FrankYang0529, thanks a lot for the fix! Left some comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1663411634


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() {
 verify(backgroundEventReaper).reap(time.milliseconds());
 }
 
+@Test
+public void testUnsubscribeWithoutGroupId() {
+consumer = newConsumerWithoutGroupId();
+completeFetchedCommittedOffsetApplicationEventExceptionally(new 
TimeoutException());
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));

Review Comment:
   do we need this? I wouldn't expect we have to mock anything related to 
committed offsets or fetching because we're not polling in the test (or using 
committed offsets in any way) right?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Remote fetch throttle metrics [kafka]

2024-07-02 Thread via GitHub


showuon commented on PR #16087:
URL: https://github.com/apache/kafka/pull/16087#issuecomment-2204973870

   @abhijeetk88 , there are format error. Please help fix it.
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16087/7/pipeline/10


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-02 Thread via GitHub


lianetm commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() {
 assertEquals(-1d, getMetricValue(metrics, 
rebalanceMetricsManager.lastRebalanceSecondsAgo));
 }
 
+@Test
+public void testLeaveGroupWhenStateIsFatal() {
+MembershipManagerImpl membershipManager = 
createMemberInStableState(null);
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+membershipManager.transitionToFatal();
+assertEquals(MemberState.FATAL, membershipManager.state());
+
+subscriptionState.assignFromUser(Collections.singleton(new 
TopicPartition("topic", 0)));
+assertEquals(1, subscriptionState.numAssignedPartitions());

Review Comment:
   I'm not sure we would ever be able to achieve this path we're mocking here 
with a real consumer, so wondering if this is even a valid scenario to test? 
(this test, and the ones below for fenced and stale too).  
   
   A member in fatal/fenced/stale state means it was part of a group (automatic 
assignment after subscribe), and then we're mocking a manual assignment and 
takes effect and is cleared on leaveGroup. I expect that would never be 
possible because the consumer does not allow to mix the subscription types, so 
the call to assign would fail before actually updating the assignment and 
numAssignedPartitions (a consumer would have to unsubscribe or assign with 
empty to then be able to manually assign the topic0 partition) 
   
   Just for the record, I totally see the value in the 
`testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and 
it's a valid combination since it's not mixing subscription types, but I 
struggle to see this call to assign happening when a member is 
fatal/fenced/stale?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


frankvicky commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204956965

   > @frankvicky , any thoughts about it? I'd also like to hear since you are 
the author of this PR.
   Hi @showuon !
   The reason why broker.id.generation.enable might need to be disabled has 
been more precisely explained by @chia7712 compared to my previous explanation.
   
   Regarding the second point from @chia7712 , I'm thinking the potential 
drawbacks of allowing node.id to be mutable.
   
   IMHO, could these changes lead to inconsistencies in cluster coordination 
and communication? Might they also cause any unpredictable behavior?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204940529

   > If my understanding is correct, the possible error is the broker id will 
be ignored and the generated ID might exceed the range of max id, which cause 
the error reported in JIRA. Is that right?
   
   Yep, ignoring the broker id can pass the reserved ids check but it fails on 
the default node.id. Explicitly defining the "broker.id=generated id" can 
update node.id to run Kraft components in zk broker but it fails on reserved 
ids check.
   
   > We should take the generated ID into node.id.
   
   Yes. Specifically, We can define node.id too in updating broker.id manually 
no matter how we get the broker.id. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17064) New consumer assign should update assignment in background thread

2024-07-02 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862635#comment-17862635
 ] 

Lianet Magrans commented on KAFKA-17064:


Sure, you can take it, thanks for helping out! Let me know if you have any 
other questions, and ping me when there is a PR, happy to help with the review. 

> New consumer assign should update assignment in background thread
> -
>
> Key: KAFKA-17064
> URL: https://issues.apache.org/jira/browse/KAFKA-17064
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> With the new async consumer, the subscriptionState object is shared between 
> the app thread and the background thread, but in principle all updates to the 
> assignment should happen in the background thread, to avoid race conditions. 
> Note that it's in the background where most updates to the assignment happen, 
> as result of app event processing like unsubscribe, reconciliations, etc.). 
> We've faced such races in places like unsubscribe and close, fixed by 
> ensuring that all assignment updates happen in the background, and this also 
> needs to be reviewed for the consumer.assign. The current implementation 
> triggers an AssignmentChange event that is processed in the background, but 
> that event is not really changing the assignment. It only commits offsets, 
> and the assignment is updated in the app thread by calling 
> subscriptionState.assignFromUser 
> We should consider moving the assignment update to the background thread, as 
> part of the AssignmentChangeEvent.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API

2024-07-02 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-17041:
---
Attachment: image-2024-07-03-10-22-20-432.png

> Add pagination when describe large set of metadata via Admin API 
> -
>
> Key: KAFKA-17041
> URL: https://issues.apache.org/jira/browse/KAFKA-17041
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Reporter: Omnia Ibrahim
>Priority: Major
> Attachments: image-2024-07-03-10-21-15-315.png, 
> image-2024-07-03-10-22-20-432.png
>
>
> Some of the request via Admin API timeout on large cluster or cluster with 
> large set of specific metadata. For example OffsetFetchRequest and 
> DescribeLogDirsRequest timeout due to large number of partition on cluster. 
> Also DescribeProducersRequest and ListTransactionsRequest time out due to too 
> many short lived PID or too many hanging transactions
> [KIP-1062: Introduce Pagination for some requests used by Admin 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API

2024-07-02 Thread Lin Siyuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862634#comment-17862634
 ] 

Lin Siyuan commented on KAFKA-17041:


!image-2024-07-03-10-22-20-432.png! Hello [~omnia_h_ibrahim] , there is a small 
word mistake here

> Add pagination when describe large set of metadata via Admin API 
> -
>
> Key: KAFKA-17041
> URL: https://issues.apache.org/jira/browse/KAFKA-17041
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Reporter: Omnia Ibrahim
>Priority: Major
> Attachments: image-2024-07-03-10-21-15-315.png, 
> image-2024-07-03-10-22-20-432.png
>
>
> Some of the request via Admin API timeout on large cluster or cluster with 
> large set of specific metadata. For example OffsetFetchRequest and 
> DescribeLogDirsRequest timeout due to large number of partition on cluster. 
> Also DescribeProducersRequest and ListTransactionsRequest time out due to too 
> many short lived PID or too many hanging transactions
> [KIP-1062: Introduce Pagination for some requests used by Admin 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API

2024-07-02 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-17041:
---
Attachment: image-2024-07-03-10-21-15-315.png

> Add pagination when describe large set of metadata via Admin API 
> -
>
> Key: KAFKA-17041
> URL: https://issues.apache.org/jira/browse/KAFKA-17041
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Reporter: Omnia Ibrahim
>Priority: Major
> Attachments: image-2024-07-03-10-21-15-315.png
>
>
> Some of the request via Admin API timeout on large cluster or cluster with 
> large set of specific metadata. For example OffsetFetchRequest and 
> DescribeLogDirsRequest timeout due to large number of partition on cluster. 
> Also DescribeProducersRequest and ListTransactionsRequest time out due to too 
> many short lived PID or too many hanging transactions
> [KIP-1062: Introduce Pagination for some requests used by Admin 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks

2024-07-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15524:
-

Assignee: (was: Chris Egerton)

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
> --
>
> Key: KAFKA-15524
> URL: https://issues.apache.org/jira/browse/KAFKA-15524
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky, flaky-test
>
> Last seen: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/]
>  
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392)
>  at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> 

[jira] [Reopened] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks

2024-07-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-15524:
---

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
> --
>
> Key: KAFKA-15524
> URL: https://issues.apache.org/jira/browse/KAFKA-15524
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky, flaky-test
>
> Last seen: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/]
>  
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392)
>  at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> 

[jira] [Resolved] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks

2024-07-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15524.
---
Resolution: Later

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
> --
>
> Key: KAFKA-15524
> URL: https://issues.apache.org/jira/browse/KAFKA-15524
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky, flaky-test
>
> Last seen: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/]
>  
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392)
>  at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> 

Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-07-02 Thread via GitHub


C0urante commented on code in PR #15302:
URL: https://github.com/apache/kafka/pull/15302#discussion_r1663319861


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -801,6 +806,11 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() 
throws Exception {
 
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
1,
 "Connector tasks did not start in time.");
 
+// Make sure the tasks' consumers have had a chance to actually form a 
group
+// (otherwise, the reset request will succeed because there won't be 
any active consumers)
+verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, 
NUM_RECORDS_PER_PARTITION,

Review Comment:
   Great point, I've removed mention of KAFKA-15524 from this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-02 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-17066:
---
Description: 
The updateFetchPositions func in the new consumer performs several actions 
based on the assigned partitions from the subscriptionState. The way it's 
currently implemented, it fetches committed offsets for partitions that 
required a position (retrieved from subscription state in the app thread), and 
then resets positions for the partitions still needing one (retrieved from the 
subscription state but in the backgroud thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only.
 * background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)

 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 

  was:
{color:red}着色文本{color}The updateFetchPositions func in the new consumer 
performs several actions based on the assigned partitions from the 
subscriptionState. The way it's currently implemented, it fetches committed 
offsets for partitions that required a position (retrieved from subscription 
state in the app thread), and then resets positions for the partitions still 
needing one (retrieved from the subscription state but in the backgroud 
thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only. 
 * 
background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)
 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 


> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the 

[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-02 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-17066:
---
Description: 
{color:red}着色文本{color}The updateFetchPositions func in the new consumer 
performs several actions based on the assigned partitions from the 
subscriptionState. The way it's currently implemented, it fetches committed 
offsets for partitions that required a position (retrieved from subscription 
state in the app thread), and then resets positions for the partitions still 
needing one (retrieved from the subscription state but in the backgroud 
thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only. 
 * 
background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)
 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 

  was:
The updateFetchPositions func in the new consumer performs several actions 
based on the assigned partitions from the subscriptionState. The way it's 
currently implemented, it fetches committed offsets for partitions that 
required a position (retrieved from subscription state in the app thread), and 
then resets positions for the partitions still needing one (retrieved from the 
subscription state but in the backgroud thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only. 
 * 
background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)
 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 


> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> {color:red}着色文本{color}The updateFetchPositions func in the new consumer 
> performs several actions based on the assigned partitions from the 
> subscriptionState. The way it's currently implemented, it fetches committed 
> offsets for partitions that required a position (retrieved from subscription 
> state in the app thread), and then 

Re: [PR] KAFKA-16529; Implement raft response handling [kafka]

2024-07-02 Thread via GitHub


jsancio commented on code in PR #16454:
URL: https://github.com/apache/kafka/pull/16454#discussion_r1663345822


##
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##
@@ -86,10 +87,10 @@ protected CandidateState(
 this.backoffTimer = time.timer(0);
 this.log = logContext.logger(CandidateState.class);
 
-for (Integer voterId : voters.voterIds()) {
-voteStates.put(voterId, State.UNRECORDED);
+for (ReplicaKey voter : voters.voterKeys()) {
+voteStates.put(voter.id(), new VoterState(voter));
 }
-voteStates.put(localId, State.GRANTED);
+voteStates.get(localId).setState(State.GRANTED);

Review Comment:
   The invariant is that only voters can transition to `CandidateState` so that 
means that `voteState` will always contain the local id. I added this check to 
verify that: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/CandidateState.java#L67-L76
   
   Having said that I was looking at some of the transitions and I see that we 
don't satisfy that invariant in all cases. I filed this jira for me to revisit 
this: [Fix the transition to 
CandidateState](https://issues.apache.org/jira/browse/KAFKA-17067). I probably 
should work on this Jira before we merge the AddRaftVoter RPC PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17067) Fix the transition to CandidateState

2024-07-02 Thread Jira
José Armando García Sancio created KAFKA-17067:
--

 Summary: Fix the transition to CandidateState
 Key: KAFKA-17067
 URL: https://issues.apache.org/jira/browse/KAFKA-17067
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio
 Fix For: 3.9.0


Only voters should be able to transition to CandidateState. The current code 
allows VotedState to transition to CandidateState. Not all VotedState are 
voters.

This was relaxed as part of KIP-853. No-voters are allowed to vote for leaders 
even though they locally think they are not voters.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16529; Implement raft response handling [kafka]

2024-07-02 Thread via GitHub


jsancio commented on code in PR #16454:
URL: https://github.com/apache/kafka/pull/16454#discussion_r1663336934


##
raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java:
##
@@ -58,193 +60,229 @@ private CandidateState newCandidateState(VoterSet voters) 
{
 );
 }
 
-@Test
-public void testSingleNodeQuorum() {
-CandidateState state = 
newCandidateState(voterSetWithLocal(IntStream.empty()));
+@ParameterizedTest
+@ValueSource(booleans = { true, false })

Review Comment:
   Looks like Kafka's checkstyle accepts both (`{ true, false }` and `{true, 
false}`). If we want to change this, do you mind if we do this in another PR? I 
use this style in a lot of places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663330737


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+public class RangeSet implements Set {
+private final int from;
+private final int to;
+
+/**
+ * Constructs a {@code RangeSet} with the specified range.
+ *
+ * @param from  The starting value (inclusive) of the range.
+ * @param toThe ending value (exclusive) of the range.
+ */
+public RangeSet(int from, int to) {
+this.from = from;
+this.to = to;
+}
+
+@Override
+public int size() {
+return to - from;
+}
+
+@Override
+public boolean isEmpty() {
+return size() == 0;
+}
+
+@Override
+public boolean contains(Object o) {
+if (o instanceof Integer) {
+int value = (Integer) o;
+return value >= from && value < to;
+}
+return false;
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {
+private int current = from;
+
+@Override
+public boolean hasNext() {
+return current < to;
+}
+
+@Override
+public Integer next() {
+if (!hasNext()) throw new NoSuchElementException();
+return current++;
+}
+};
+}
+
+@Override
+public Object[] toArray() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public  T[] toArray(T[] a) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean add(Integer integer) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean remove(Object o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean containsAll(Collection c) {
+for (Object o : c) {
+if (!contains(o)) return false;
+}
+return true;
+}
+
+@Override
+public boolean addAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean retainAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean removeAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void clear() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String toString() {
+StringBuilder sb = new StringBuilder();
+sb.append("[");
+for (int i = from; i < to; i++) {
+sb.append(i);
+if (i < to - 1) {
+sb.append(", ");
+}
+}
+sb.append("]");
+return sb.toString();

Review Comment:
   Okie I changed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16529; Implement raft response handling [kafka]

2024-07-02 Thread via GitHub


jsancio commented on code in PR #16454:
URL: https://github.com/apache/kafka/pull/16454#discussion_r1663328497


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1895,12 +2021,14 @@ private Optional maybeHandleCommonResponse(
 }
 } else if (error == Errors.BROKER_NOT_AVAILABLE) {
 return Optional.of(false);
-} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL) {
-// For now we treat this as a fatal error. Once we have support 
for quorum
-// reassignment, this error could suggest that either we or the 
recipient of
-// the request just has stale voter information, which means we 
can retry
-// after backing off.
-throw new IllegalStateException("Received error indicating 
inconsistent voter sets");
+} else if (error == Errors.INVALID_VOTER_KEY) {
+// The voter key in the request for VOTE and BEGIN_QUORUM_EPOCH 
doesn't match the
+// receiver's replica key
+logger.info(

Review Comment:
   I added the node information of the receiver (sender of the response). At 
this point in the code we don't have easy access to the voter key 
(`ReplicaKey`) sent in the request. To get access to it I would have to extend 
the response wrapper (`RaftResponse.Inbound`) to include the original request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16529; Implement raft response handling [kafka]

2024-07-02 Thread via GitHub


jsancio commented on code in PR #16454:
URL: https://github.com/apache/kafka/pull/16454#discussion_r1663323596


##
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##
@@ -147,14 +148,18 @@ public boolean isVoteRejected() {
  * rejected by this node
  */
 public boolean recordGrantedVote(int remoteNodeId) {
-State state = voteStates.get(remoteNodeId);
-if (state == null) {
+VoterState voterState = voteStates.get(remoteNodeId);
+if (voterState == null) {
 throw new IllegalArgumentException("Attempt to grant vote to 
non-voter " + remoteNodeId);
-} else if (state == State.REJECTED) {
+} else if (voterState.state() == State.REJECTED) {

Review Comment:
   Fixed. Limited the changes to only this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-07-02 Thread via GitHub


C0urante commented on PR #15302:
URL: https://github.com/apache/kafka/pull/15302#issuecomment-2204763797

   @gharris1727 sorry for the delay. You're correct that this PR did not 
actually address KAFKA-15224, and I'm honestly not sure how I missed that the 
first time around. I've updated the description to only mention KAFKA-15917, 
added some useful info to assertion messages when sink connectors fail to catch 
up to the expected committed offsets in these tests, and rebased on the latest 
trunk.
   
   Would you mind giving this another pass when you have a moment? Currently 
this test is the flakiest of all Connect integration tests, failing nearly 10% 
of the time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]

2024-07-02 Thread via GitHub


C0urante commented on code in PR #15302:
URL: https://github.com/apache/kafka/pull/15302#discussion_r1663319861


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -801,6 +806,11 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() 
throws Exception {
 
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
1,
 "Connector tasks did not start in time.");
 
+// Make sure the tasks' consumers have had a chance to actually form a 
group
+// (otherwise, the reset request will succeed because there won't be 
any active consumers)
+verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, 
NUM_RECORDS_PER_PARTITION,

Review Comment:
   Great point, I've removed mention of KAFKA-15224 from this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17064) New consumer assign should update assignment in background thread

2024-07-02 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862619#comment-17862619
 ] 

PoAn Yang commented on KAFKA-17064:
---

Hi [~lianetm], thanks for your explanation. If you're not working on this, may 
I assign the issue to myself? I can handle it after 
[https://github.com/apache/kafka/pull/16449] is merged. Thank you.

> New consumer assign should update assignment in background thread
> -
>
> Key: KAFKA-17064
> URL: https://issues.apache.org/jira/browse/KAFKA-17064
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> With the new async consumer, the subscriptionState object is shared between 
> the app thread and the background thread, but in principle all updates to the 
> assignment should happen in the background thread, to avoid race conditions. 
> Note that it's in the background where most updates to the assignment happen, 
> as result of app event processing like unsubscribe, reconciliations, etc.). 
> We've faced such races in places like unsubscribe and close, fixed by 
> ensuring that all assignment updates happen in the background, and this also 
> needs to be reviewed for the consumer.assign. The current implementation 
> triggers an AssignmentChange event that is processed in the background, but 
> that event is not really changing the assignment. It only commits offsets, 
> and the assignment is updated in the app thread by calling 
> subscriptionState.assignFromUser 
> We should consider moving the assignment update to the background thread, as 
> part of the AssignmentChangeEvent.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


showuon commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204751198

   @frankvicky , any thoughts about it? I'd also like to hear since you are the 
author of this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


showuon commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204748082

   >  1.   update docs to explain the possible error when you define the broker 
id for the zk broker which is using broker.id.generation.enable
   
   If my understanding is correct, the possible error is the broker id will be 
ignored and the generated ID might exceed the range of max id, which cause the 
error reported in JIRA. Is that right?
   
> 2.  in order to minimize the changes, we can update both broker.id and 
node.id together. The side effect is KafkaConfig#nodeId will be a mutable 
variable.
   
   IMO, this should be a bug. We should take the generated ID into node.id. 
WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663307155


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663277435


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+public class RangeSet implements Set {
+private final int from;
+private final int to;
+
+/**
+ * Constructs a {@code RangeSet} with the specified range.
+ *
+ * @param from  The starting value (inclusive) of the range.
+ * @param toThe ending value (exclusive) of the range.
+ */
+public RangeSet(int from, int to) {
+this.from = from;
+this.to = to;
+}
+
+@Override
+public int size() {
+return to - from;
+}
+
+@Override
+public boolean isEmpty() {
+return size() == 0;
+}
+
+@Override
+public boolean contains(Object o) {
+if (o instanceof Integer) {
+int value = (Integer) o;
+return value >= from && value < to;
+}
+return false;
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {
+private int current = from;
+
+@Override
+public boolean hasNext() {
+return current < to;
+}
+
+@Override
+public Integer next() {
+if (!hasNext()) throw new NoSuchElementException();
+return current++;
+}
+};
+}
+
+@Override
+public Object[] toArray() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public  T[] toArray(T[] a) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean add(Integer integer) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean remove(Object o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean containsAll(Collection c) {
+for (Object o : c) {
+if (!contains(o)) return false;
+}
+return true;
+}
+
+@Override
+public boolean addAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean retainAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean removeAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void clear() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String toString() {
+StringBuilder sb = new StringBuilder();
+sb.append("[");
+for (int i = from; i < to; i++) {
+sb.append(i);
+if (i < to - 1) {
+sb.append(", ");
+}
+}
+sb.append("]");
+return sb.toString();
+}
+
+/**
+ * Compares the specified object with this set for equality. Returns 
{@code true}
+ * if the specified object is also a set, the two sets have the same size, 
and
+ * every member of the specified set is contained in this set.
+ *
+ * @param o object to be compared for equality with this set
+ * @return {@code true} if the specified object is equal to this set
+ */
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (!(o instanceof Set)) return false;
+
+Set otherSet = (Set) o;
+if (otherSet.size() != this.size()) return false;
+
+for (int i = from; i < to; i++) {
+if (!otherSet.contains(i)) return false;
+}

Review Comment:
   yess makes sense



-- 
This is an automated 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663251568


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663248450


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new 

Re: [PR] MINOR: Automatically await startup and cluster warmup in Connect EmbeddedKafkaCluster::start [kafka]

2024-07-02 Thread via GitHub


C0urante commented on PR #16327:
URL: https://github.com/apache/kafka/pull/16327#issuecomment-2204593040

   This doesn't seem to have had a noticeable effect on test flakiness, going 
to abandon it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Automatically await startup and cluster warmup in Connect EmbeddedKafkaCluster::start [kafka]

2024-07-02 Thread via GitHub


C0urante closed pull request #16327: MINOR: Automatically await startup and 
cluster warmup in Connect EmbeddedKafkaCluster::start
URL: https://github.com/apache/kafka/pull/16327


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16986) After upgrading to Kafka 3.4.1, the producer constantly produces logs related to topicId changes

2024-07-02 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862607#comment-17862607
 ] 

Justine Olshan commented on KAFKA-16986:


I guess you may see this as we expire metadata after metadata.max.idle.ms (= 
30). I wonder if that's what is happening.

> After upgrading to Kafka 3.4.1, the producer constantly produces logs related 
> to topicId changes
> 
>
> Key: KAFKA-16986
> URL: https://issues.apache.org/jira/browse/KAFKA-16986
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.0.1, 3.6.1
>Reporter: Vinicius Vieira dos Santos
>Priority: Minor
> Attachments: image-2024-07-01-09-05-17-147.png, image.png
>
>
> When updating the Kafka broker from version 2.7.0 to 3.4.1, we noticed that 
> the applications began to log the message "{*}Resetting the last seen epoch 
> of partition PAYMENTS-0 to 0 since the associated topicId changed from null 
> to szRLmiAiTs8Y0nI8b3Wz1Q{*}" in a very constant, from what I understand this 
> behavior is not expected because the topic was not deleted and recreated so 
> it should simply use the cached data and not go through this client log line.
> We have some applications with around 15 topics and 40 partitions which means 
> around 600 log lines when metadata updates occur
> The main thing for me is to know if this could indicate a problem or if I can 
> simply change the log level of the org.apache.kafka.clients.Metadata class to 
> warn without worries
>  
> There are other reports of the same behavior like this:  
> [https://stackoverflow.com/questions/74652231/apache-kafka-resetting-the-last-seen-epoch-of-partition-why]
>  
> *Some log occurrences over an interval of about 7 hours, each block refers to 
> an instance of the application in kubernetes*
>  
> !image.png!
> *My scenario:*
> *Application:*
>  - Java: 21
>  - Client: 3.6.1, also tested on 3.0.1 and has the same behavior
> *Broker:*
>  - Cluster running on Kubernetes with the bitnami/kafka:3.4.1-debian-11-r52 
> image
>  
> *Producer Config*
>  
>     acks = -1
>     auto.include.jmx.reporter = true
>     batch.size = 16384
>     bootstrap.servers = [server:9092]
>     buffer.memory = 33554432
>     client.dns.lookup = use_all_dns_ips
>     client.id = producer-1
>     compression.type = gzip
>     connections.max.idle.ms = 54
>     delivery.timeout.ms = 3
>     enable.idempotence = true
>     interceptor.classes = []
>     key.serializer = class 
> org.apache.kafka.common.serialization.ByteArraySerializer
>     linger.ms = 0
>     max.block.ms = 6
>     max.in.flight.requests.per.connection = 1
>     max.request.size = 1048576
>     metadata.max.age.ms = 30
>     metadata.max.idle.ms = 30
>     metric.reporters = []
>     metrics.num.samples = 2
>     metrics.recording.level = INFO
>     metrics.sample.window.ms = 3
>     partitioner.adaptive.partitioning.enable = true
>     partitioner.availability.timeout.ms = 0
>     partitioner.class = null
>     partitioner.ignore.keys = false
>     receive.buffer.bytes = 32768
>     reconnect.backoff.max.ms = 1000
>     reconnect.backoff.ms = 50
>     request.timeout.ms = 3
>     retries = 3
>     retry.backoff.ms = 100
>     sasl.client.callback.handler.class = null
>     sasl.jaas.config = [hidden]
>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>     sasl.kerberos.min.time.before.relogin = 6
>     sasl.kerberos.service.name = null
>     sasl.kerberos.ticket.renew.jitter = 0.05
>     sasl.kerberos.ticket.renew.window.factor = 0.8
>     sasl.login.callback.handler.class = null
>     sasl.login.class = null
>     sasl.login.connect.timeout.ms = null
>     sasl.login.read.timeout.ms = null
>     sasl.login.refresh.buffer.seconds = 300
>     sasl.login.refresh.min.period.seconds = 60
>     sasl.login.refresh.window.factor = 0.8
>     sasl.login.refresh.window.jitter = 0.05
>     sasl.login.retry.backoff.max.ms = 1
>     sasl.login.retry.backoff.ms = 100
>     sasl.mechanism = PLAIN
>     sasl.oauthbearer.clock.skew.seconds = 30
>     sasl.oauthbearer.expected.audience = null
>     sasl.oauthbearer.expected.issuer = null
>     sasl.oauthbearer.jwks.endpoint.refresh.ms = 360
>     sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1
>     sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>     sasl.oauthbearer.jwks.endpoint.url = null
>     sasl.oauthbearer.scope.claim.name = scope
>     sasl.oauthbearer.sub.claim.name = sub
>     sasl.oauthbearer.token.endpoint.url = null
>     security.protocol = SASL_PLAINTEXT
>     security.providers = null
>     send.buffer.bytes = 131072
>     socket.connection.setup.timeout.max.ms = 3
>     socket.connection.setup.timeout.ms = 1
>    

[jira] [Commented] (KAFKA-16986) After upgrading to Kafka 3.4.1, the producer constantly produces logs related to topicId changes

2024-07-02 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862606#comment-17862606
 ] 

Justine Olshan commented on KAFKA-16986:


Hey, thanks for taking a look at it. It is very strange that the same producer 
would log this error more than once since it means somehow the producer got 
metadata with a topic ID, updated it (first instance of message) and then 
somehow loses it again and upon getting it again logs again. If a producer is 
writing to the same topic consistently, I'm not sure how it could lose the 
topic ID in the metadata unless some brokers have metadata containing the topic 
ID and others do not. 

> After upgrading to Kafka 3.4.1, the producer constantly produces logs related 
> to topicId changes
> 
>
> Key: KAFKA-16986
> URL: https://issues.apache.org/jira/browse/KAFKA-16986
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.0.1, 3.6.1
>Reporter: Vinicius Vieira dos Santos
>Priority: Minor
> Attachments: image-2024-07-01-09-05-17-147.png, image.png
>
>
> When updating the Kafka broker from version 2.7.0 to 3.4.1, we noticed that 
> the applications began to log the message "{*}Resetting the last seen epoch 
> of partition PAYMENTS-0 to 0 since the associated topicId changed from null 
> to szRLmiAiTs8Y0nI8b3Wz1Q{*}" in a very constant, from what I understand this 
> behavior is not expected because the topic was not deleted and recreated so 
> it should simply use the cached data and not go through this client log line.
> We have some applications with around 15 topics and 40 partitions which means 
> around 600 log lines when metadata updates occur
> The main thing for me is to know if this could indicate a problem or if I can 
> simply change the log level of the org.apache.kafka.clients.Metadata class to 
> warn without worries
>  
> There are other reports of the same behavior like this:  
> [https://stackoverflow.com/questions/74652231/apache-kafka-resetting-the-last-seen-epoch-of-partition-why]
>  
> *Some log occurrences over an interval of about 7 hours, each block refers to 
> an instance of the application in kubernetes*
>  
> !image.png!
> *My scenario:*
> *Application:*
>  - Java: 21
>  - Client: 3.6.1, also tested on 3.0.1 and has the same behavior
> *Broker:*
>  - Cluster running on Kubernetes with the bitnami/kafka:3.4.1-debian-11-r52 
> image
>  
> *Producer Config*
>  
>     acks = -1
>     auto.include.jmx.reporter = true
>     batch.size = 16384
>     bootstrap.servers = [server:9092]
>     buffer.memory = 33554432
>     client.dns.lookup = use_all_dns_ips
>     client.id = producer-1
>     compression.type = gzip
>     connections.max.idle.ms = 54
>     delivery.timeout.ms = 3
>     enable.idempotence = true
>     interceptor.classes = []
>     key.serializer = class 
> org.apache.kafka.common.serialization.ByteArraySerializer
>     linger.ms = 0
>     max.block.ms = 6
>     max.in.flight.requests.per.connection = 1
>     max.request.size = 1048576
>     metadata.max.age.ms = 30
>     metadata.max.idle.ms = 30
>     metric.reporters = []
>     metrics.num.samples = 2
>     metrics.recording.level = INFO
>     metrics.sample.window.ms = 3
>     partitioner.adaptive.partitioning.enable = true
>     partitioner.availability.timeout.ms = 0
>     partitioner.class = null
>     partitioner.ignore.keys = false
>     receive.buffer.bytes = 32768
>     reconnect.backoff.max.ms = 1000
>     reconnect.backoff.ms = 50
>     request.timeout.ms = 3
>     retries = 3
>     retry.backoff.ms = 100
>     sasl.client.callback.handler.class = null
>     sasl.jaas.config = [hidden]
>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>     sasl.kerberos.min.time.before.relogin = 6
>     sasl.kerberos.service.name = null
>     sasl.kerberos.ticket.renew.jitter = 0.05
>     sasl.kerberos.ticket.renew.window.factor = 0.8
>     sasl.login.callback.handler.class = null
>     sasl.login.class = null
>     sasl.login.connect.timeout.ms = null
>     sasl.login.read.timeout.ms = null
>     sasl.login.refresh.buffer.seconds = 300
>     sasl.login.refresh.min.period.seconds = 60
>     sasl.login.refresh.window.factor = 0.8
>     sasl.login.refresh.window.jitter = 0.05
>     sasl.login.retry.backoff.max.ms = 1
>     sasl.login.retry.backoff.ms = 100
>     sasl.mechanism = PLAIN
>     sasl.oauthbearer.clock.skew.seconds = 30
>     sasl.oauthbearer.expected.audience = null
>     sasl.oauthbearer.expected.issuer = null
>     sasl.oauthbearer.jwks.endpoint.refresh.ms = 360
>     sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1
>     sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>     

Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]

2024-07-02 Thread via GitHub


bbejeck commented on PR #16503:
URL: https://github.com/apache/kafka/pull/16503#issuecomment-2204539517

   Previous build failures unrelated.  Kicking off the build one more time to 
confirm test successful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17047: Refactored group coordinator classes to modern package (KIP-932) [kafka]

2024-07-02 Thread via GitHub


apoorvmittal10 commented on PR #16474:
URL: https://github.com/apache/kafka/pull/16474#issuecomment-2204398771

   Build passed with unrelated tests failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]

2024-07-02 Thread via GitHub


apoorvmittal10 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1663148002


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
 val shareFetchRequest = request.body[ShareFetchRequest]
-// TODO: Implement the ShareFetchRequest handling
-requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+
+if (!config.isNewGroupCoordinatorEnabled) {
+  // The API is not supported by the "old" group coordinator (the 
default). If the
+  // new one is not enabled, we fail directly here.
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+} else if (!config.isShareGroupEnabled) {
+  // The API is not supported when the "share" rebalance protocol has not 
been set explicitly
+  requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
+  return
+}
+val topicNames = metadataCache.topicIdsToNames()
+val sharePartitionManager : SharePartitionManager = 
sharePartitionManagerOption match {
+  case Some(manager) => manager
+  case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+}
+
+val groupId = shareFetchRequest.data.groupId
+val clientId = request.header.clientId
+val memberId = shareFetchRequest.data().memberId()
+val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch()
+
+val shareFetchData = shareFetchRequest.shareFetchData(topicNames)
+val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames)
+var cachedTopicPartitions : util.List[TopicIdPartition] = null
+
+if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+  try {
+cachedTopicPartitions = 
sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, 
Uuid.fromString(memberId))

Review Comment:
   @AndrewJSchofield @adixitconfluent @omkreddy Can I get review on this 
please: https://github.com/apache/kafka/pull/16513



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16754: Removing partitions from release API (KIP-932) [kafka]

2024-07-02 Thread via GitHub


apoorvmittal10 opened a new pull request, #16513:
URL: https://github.com/apache/kafka/pull/16513

   The release API exposed Partitions which should be an internal 
implementation detail for `releaseAcquiredRecords` API. Also lessen the scope 
for cached topic partitons method as it's not needed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17064) New consumer assign should update assignment in background thread

2024-07-02 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862589#comment-17862589
 ] 

Lianet Magrans commented on KAFKA-17064:


Hey [~yangpoan] , definitely, based on events that in the end will execute in 
the background thread. I see 2 cases:

1. If the assign has empty partitions I would say we're covered: it relies on a 
call to unsubscribe, and that performs all assignment updates in the background 
already. We just need KAFKA-17017 to get it all right when the member is not in 
a group, sure thing. 
2. if the assign has non-empty partitions, it will end up updating the 
assignment in the app thread, and we should fix that (this issue). We should 
rely on a event indeed and we already have it actually, it's the 
AssignmentChangeEvent. The issue is that this event currently does only part of 
the job (it only triggers a commit 
https://github.com/apache/kafka/blob/c97d4ce026fdb75f80760bcce00b239db951a481/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L181).
 Misleading name indeed. 
We should consider having that event handle the full assignment change (commit 
and update the subscription state with assignFromUser), all in the background.  

> New consumer assign should update assignment in background thread
> -
>
> Key: KAFKA-17064
> URL: https://issues.apache.org/jira/browse/KAFKA-17064
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> With the new async consumer, the subscriptionState object is shared between 
> the app thread and the background thread, but in principle all updates to the 
> assignment should happen in the background thread, to avoid race conditions. 
> Note that it's in the background where most updates to the assignment happen, 
> as result of app event processing like unsubscribe, reconciliations, etc.). 
> We've faced such races in places like unsubscribe and close, fixed by 
> ensuring that all assignment updates happen in the background, and this also 
> needs to be reviewed for the consumer.assign. The current implementation 
> triggers an AssignmentChange event that is processed in the background, but 
> that event is not really changing the assignment. It only commits offsets, 
> and the assignment is updated in the app thread by calling 
> subscriptionState.assignFromUser 
> We should consider moving the assignment update to the background thread, as 
> part of the AssignmentChangeEvent.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Master fix downstream trigger [kafka]

2024-07-02 Thread via GitHub


Muen342 closed pull request #16512: Master fix downstream trigger
URL: https://github.com/apache/kafka/pull/16512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17017) AsyncConsumer#unsubscribe does not clean the assigned partitions

2024-07-02 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862584#comment-17862584
 ] 

Lianet Magrans commented on KAFKA-17017:


Hey [~yangpoan] / [~chia7712] , sorry I missed this, was out for a few days. 
Fair issue, the intention was not to change the behaviour, I agree with the gap 
reported. 

Thinking about an approach, I would imagine that we could simply delegate 
everything on the UnsubscribeEvent, meaning that the AsyncConsumer#unsubscribe 
would not bother checking if in group or not (no need for point 1 above), and 
we would just trigger the UnsuscribeEvent to address this situation in a single 
place (point 2 above only):  
{quote}2) `MembershipManagerImpl#leaveGroup` should call 
`subscriptions.unsubscribe` if the `isNotInGroup` is true
{quote}
Not sure if I'm missing a detail, but seems like a consistent/simple approach 
to solve this. The membershipMgr already calls subscriptions.unsubscribe when 
it leave the group (on the leaveGroup()), so we would just complete the story 
to cover the not in group case, but all handled in the same component. 

> AsyncConsumer#unsubscribe does not clean the assigned partitions
> 
>
> Key: KAFKA-17017
> URL: https://issues.apache.org/jira/browse/KAFKA-17017
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Major
>  Labels: kip-848-client-support
>
> According to docs [0] `Consumer#unsubscribe` should clean both subscribed and 
> assigned partitions. However, there are two issues about `AsyncConsumer`
> 1)  if we don't set group id,  `AsyncConsumer#unsubscribe`[1] will be no-op
> 2)  if we set group id, `AsyncConsumer` is always in `UNSUBSCRIBED` state and 
> so `MembershipManagerImpl#leaveGroup`[2] will be no-op
> [0] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L759
> [1] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1479
> [2] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L666



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Master fix downstream trigger [kafka]

2024-07-02 Thread via GitHub


Muen342 opened a new pull request, #16512:
URL: https://github.com/apache/kafka/pull/16512

   fix map to correct branches 
https://github.com/confluentinc/kafka/pull/1204/files#diff-e6ffa5dc854b843b3ee3c3c28f8eae2f436c2df2b1ca299cca1fa5982e390cf8L54-L61
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-02 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-17066:
---
Priority: Blocker  (was: Major)

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only. 
>  * 
> background thread -> receives new partition tp1 and completes reconciliation 
> (adds it to the subscription state as INITIALIZING, requires a position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-02 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-17066:
--

 Summary: New consumer updateFetchPositions should perform all 
operations in background thread
 Key: KAFKA-17066
 URL: https://issues.apache.org/jira/browse/KAFKA-17066
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.8.0
Reporter: Lianet Magrans
 Fix For: 3.9.0


The updateFetchPositions func in the new consumer performs several actions 
based on the assigned partitions from the subscriptionState. The way it's 
currently implemented, it fetches committed offsets for partitions that 
required a position (retrieved from subscription state in the app thread), and 
then resets positions for the partitions still needing one (retrieved from the 
subscription state but in the backgroud thread). 
This is problematic, given that the assignment/subscriptionState may change in 
the background thread at any time (ex. new partitions reconciled), so we could 
end up resetting positions to the partition offsets for a partition for which 
we never evetn attempted to retrieve committed offsets.  

This sequence for a consumer that owns a partitions tp0,:
 * consumer owns tp0
 * app thread -> updateFetchPositions triggers 
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
partitions requiring a position (taking them from 
subscriptions.initializingPartitions()). This will fetch committed offsets for 
tp0 only. 
 * 
background thread -> receives new partition tp1 and completes reconciliation 
(adds it to the subscription state as INITIALIZING, requires a position)
 * app thread -> updateFetchPositions resets positions for all partitions that 
still don't have a valid position after initWithCommittedOffsetsIfNeeded 
(taking them from subscriptionState.partitionsNeedingReset). This will 
mistakenly consider that it should reset tp1 to the partition offsets, when in 
reality it never even tried fetching the committed offsets for it because it 
wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

We should consider moving the updateFetchPositions as a single event to the 
background, that would safely use the subscriptionState object and apply all 
actions involved in the updateFetchPositions to the same consistent set of 
partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17032: NioEchoServer should generate meaningful id instead of incremental number. [kafka]

2024-07-02 Thread via GitHub


chia7712 merged PR #16460:
URL: https://github.com/apache/kafka/pull/16460


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17032) NioEchoServer should generate meaningful id instead of incremential number

2024-07-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17032.

Fix Version/s: 3.9.0
   Resolution: Fixed

> NioEchoServer should generate meaningful id instead of incremential number
> --
>
> Key: KAFKA-17032
> URL: https://issues.apache.org/jira/browse/KAFKA-17032
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.9.0
>
>
> see discussion: 
> https://github.com/apache/kafka/pull/16384#issuecomment-2187071751



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16806: Explicitly declare JUnit dependencies for all test modules [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on code in PR #16301:
URL: https://github.com/apache/kafka/pull/16301#discussion_r1663048605


##
build.gradle:
##
@@ -35,6 +35,7 @@ plugins {
   id 'idea'
   id 'jacoco'
   id 'java-library'
+  id 'jvm-test-suite'

Review Comment:
   We have introduced dependency of junit platform 
(https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L204), 
so maybe we can manually declare dependencies?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16106: revert classic state transitions if deletion fails [kafka]

2024-07-02 Thread via GitHub


jeffkbkim opened a new pull request, #16511:
URL: https://github.com/apache/kafka/pull/16511

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16550) add integration test for LogDirsCommand

2024-07-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16550.

Fix Version/s: 3.9.0
   Resolution: Fixed

> add integration test for LogDirsCommand
> ---
>
> Key: KAFKA-16550
> URL: https://issues.apache.org/jira/browse/KAFKA-16550
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.9.0
>
>
> Currently LogDirsCommand have only UT



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16550: add integration test for LogDirsCommand [kafka]

2024-07-02 Thread via GitHub


chia7712 merged PR #16485:
URL: https://github.com/apache/kafka/pull/16485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17032: NioEchoServer should generate meaningful id instead of incremental number. [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on PR #16460:
URL: https://github.com/apache/kafka/pull/16460#issuecomment-2204075651

   @gharris1727 any feedback?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version

2024-07-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17051.

Fix Version/s: 3.9.0
   Resolution: Fixed

> ApiKeys#toHtml should exclude the APIs having unstable latest version
> -
>
> Key: KAFKA-17051
> URL: https://issues.apache.org/jira/browse/KAFKA-17051
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
> Fix For: 3.9.0
>
>
> see the discussion: 
> https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o
> The (released) docs should show only the APIs which are ready to be exposed 
> publicly. Those APIs having make "latestVersionUnstable=true" should be 
> excluded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17051: ApiKeys#toHtml should exclude the APIs having unstable latest version [kafka]

2024-07-02 Thread via GitHub


chia7712 merged PR #16480:
URL: https://github.com/apache/kafka/pull/16480


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-3346) Rename "Mode" to "SslMode"

2024-07-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-3346.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Rename "Mode" to "SslMode"
> --
>
> Key: KAFKA-3346
> URL: https://issues.apache.org/jira/browse/KAFKA-3346
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Ksolves
>Priority: Major
> Fix For: 3.9.0
>
>
> In the channel builders, the Mode enum is undocumented, so it is unclear that 
> it is used to signify whether the connection is for SSL client or SSL server.
> I suggest renaming to SslMode (although adding documentation will be ok too, 
> if people object to the rename)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-3346: Refactor & Rename Mode to ConnectionMode [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on PR #16403:
URL: https://github.com/apache/kafka/pull/16403#issuecomment-2204063329

   @abhi-ksolves thanks for this contribution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-3346: Refactor & Rename Mode to ConnectionMode [kafka]

2024-07-02 Thread via GitHub


chia7712 merged PR #16403:
URL: https://github.com/apache/kafka/pull/16403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204040094

   > Thus, I don't think it will cause the error as described in the JIRA, 
right? I still don't understand in what circumstance this issue will happen. It 
might need more explanation. Thanks.
   
   thanks for this response, and it inspires me to dig-in the root cause. The 
issue happens when the zk broker is using generated broker id and it is in 
migrating.
   
   1. if we define the broker id in config file, the broker id is viewed as 
invalid since it is in the reserved range (when 
`broker.id.generation.enable=true`)
   2. if we don't define the broker id in config file, the broker id will be 
evaluated according to `meta.properties` [0] . However, we forgot to update 
`node.id` and so kraft client will use "node.id=-1" [1] to build and it results 
in error ..
   
   
   [0] 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L261
   [1] 
https://github.com/apache/kafka/blob/e55c28c60b0f021b12c93fa04e360ce7a2e0a5ac/core/src/main/scala/kafka/raft/RaftManager.scala#L230
   
   In short, we don't consider the use case of "generated broker id (the broker 
id is omitted)" + "zk migration". It seems we need two fixes:
   
   1. update docs to explain the possible error when you define the broker id 
for the zk broker which is using `broker.id.generation.enable`
   2. in order to minimize the changes, we can update both `broker.id` and 
`node.id` together. The side effect is `KafkaConfig#nodeId` will be a mutable 
variable.
   
   @showuon WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662970452


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -3839,6 +3842,209 @@ public void close() {}
 assertEquals("response1", write.get(5, TimeUnit.SECONDS));
 }
 
+@Test
+public void testScheduleNonAtomicWriteOperation() throws 
ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(Duration.ofMillis(20))
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.withSerializer(new StringSerializer())
+.withAppendLingerMs(10)
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertNull(ctx.currentBatch);
+
+// Get the max batch size.
+int maxBatchSize = writer.config(TP).maxMessageSize();
+
+// Create records with a quarter of the max batch size each. Keep in 
mind that
+// each batch has a header so it is not possible to have those four 
records
+// in one single batch.
+List records = Stream.of('1', '2', '3', '4').map(c -> {
+char[] payload = new char[maxBatchSize / 4];
+Arrays.fill(payload, c);
+return new String(payload);
+}).collect(Collectors.toList());
+
+// Let's try to write all the records atomically (the default) to 
ensure
+// that it fails.
+CompletableFuture write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#1")
+);
+
+assertFutureThrows(write1, RecordTooLargeException.class);
+
+// Let's try to write the same records non-atomically.
+CompletableFuture write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#2", null, true, 
false)
+);
+
+// The write is pending.
+assertFalse(write2.isDone());
+
+// Verify the state.
+assertNotNull(ctx.currentBatch);
+// The last written offset is 3L because one batch was written to the 
log with
+// the first three records. The 4th one is pending.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+), ctx.coordinator.coordinator().fullRecords());
+assertEquals(Collections.singletonList(
+records(timer.time().milliseconds(), records.subList(0, 3))
+), writer.entries(TP));
+
+// Commit up to 3L.
+writer.commit(TP, 3L);
+
+// The write is still pending.
+assertFalse(write2.isDone());
+
+// Advance past the linger time to flush the pending batch.
+timer.advanceClock(11);
+
+// Verify the state.
+assertNull(ctx.currentBatch);
+assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new 

Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


jolshan commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662961495


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -3839,6 +3842,209 @@ public void close() {}
 assertEquals("response1", write.get(5, TimeUnit.SECONDS));
 }
 
+@Test
+public void testScheduleNonAtomicWriteOperation() throws 
ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(Duration.ofMillis(20))
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.withSerializer(new StringSerializer())
+.withAppendLingerMs(10)
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertNull(ctx.currentBatch);
+
+// Get the max batch size.
+int maxBatchSize = writer.config(TP).maxMessageSize();
+
+// Create records with a quarter of the max batch size each. Keep in 
mind that
+// each batch has a header so it is not possible to have those four 
records
+// in one single batch.
+List records = Stream.of('1', '2', '3', '4').map(c -> {
+char[] payload = new char[maxBatchSize / 4];
+Arrays.fill(payload, c);
+return new String(payload);
+}).collect(Collectors.toList());
+
+// Let's try to write all the records atomically (the default) to 
ensure
+// that it fails.
+CompletableFuture write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#1")
+);
+
+assertFutureThrows(write1, RecordTooLargeException.class);
+
+// Let's try to write the same records non-atomically.
+CompletableFuture write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#2", null, true, 
false)
+);
+
+// The write is pending.
+assertFalse(write2.isDone());
+
+// Verify the state.
+assertNotNull(ctx.currentBatch);
+// The last written offset is 3L because one batch was written to the 
log with
+// the first three records. The 4th one is pending.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+), ctx.coordinator.coordinator().fullRecords());
+assertEquals(Collections.singletonList(
+records(timer.time().milliseconds(), records.subList(0, 3))
+), writer.entries(TP));
+
+// Commit up to 3L.
+writer.commit(TP, 3L);
+
+// The write is still pending.
+assertFalse(write2.isDone());
+
+// Advance past the linger time to flush the pending batch.
+timer.advanceClock(11);
+
+// Verify the state.
+assertNull(ctx.currentBatch);
+assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1662942561


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+public class RangeSet implements Set {
+private final int from;
+private final int to;
+
+/**
+ * Constructs a {@code RangeSet} with the specified range.
+ *
+ * @param from  The starting value (inclusive) of the range.
+ * @param toThe ending value (exclusive) of the range.
+ */
+public RangeSet(int from, int to) {
+this.from = from;
+this.to = to;
+}
+
+@Override
+public int size() {
+return to - from;
+}
+
+@Override
+public boolean isEmpty() {
+return size() == 0;
+}
+
+@Override
+public boolean contains(Object o) {
+if (o instanceof Integer) {
+int value = (Integer) o;
+return value >= from && value < to;
+}
+return false;
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {
+private int current = from;
+
+@Override
+public boolean hasNext() {
+return current < to;
+}
+
+@Override
+public Integer next() {
+if (!hasNext()) throw new NoSuchElementException();
+return current++;
+}
+};
+}
+
+@Override
+public Object[] toArray() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public  T[] toArray(T[] a) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean add(Integer integer) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean remove(Object o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean containsAll(Collection c) {
+for (Object o : c) {
+if (!contains(o)) return false;
+}
+return true;
+}
+
+@Override
+public boolean addAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean retainAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean removeAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void clear() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String toString() {
+StringBuilder sb = new StringBuilder();
+sb.append("[");
+for (int i = from; i < to; i++) {
+sb.append(i);
+if (i < to - 1) {
+sb.append(", ");
+}
+}
+sb.append("]");
+return sb.toString();

Review Comment:
   The short version seems better to me but it may be a personal preference. 
I’d also like to have the type printed out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1662903374


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))
+.forEach(conf -> conf.errorMessages().add(errorMsg)));

Review Comment:
   could you please use `addErrorMessage` instead of `errorMessages().add`?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   `EMIT_OFFSET_SYNCS_ENABLED` is in `MirrorSourceConfig`, so it is not a part 
of config def of `MirrorCheckpointConnector`. Hence, the error related to 
`EMIT_OFFSET_SYNCS_ENABLED` can't be propagated.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -234,6 +234,30 @@ public ConfigDef config() {
 @Override
 public org.apache.kafka.common.config.Config validate(Map 
props) {
 List configValues = super.validate(props).configValues();
+validateExactlyOnceConfigs(props, configValues);
+validateEmitOffsetSyncConfigs(props, configValues);
+
+return new org.apache.kafka.common.config.Config(configValues);
+}
+
+private static void validateEmitOffsetSyncConfigs(Map 
props, List configValues) {
+boolean offsetSyncsConfigured = props.keySet().stream()
+.anyMatch(conf -> 
conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || 
conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX));
+
+if 
("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && 
offsetSyncsConfigured) {

Review Comment:
   what if `EMIT_OFFSET_SYNCS_ENABLED=true` and `offsetSyncsConfigured=false`? 
Should we add error message for it?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+configValues.stream()
+.filter(conf -> conf.name().equals(config))

Review Comment:
   The following test shows my concern:
   ```java
   @Test
   public void test() {
   Map props = new HashMap<>();
   props.put(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, "false");
   MirrorCheckpointConnector connector = new 
MirrorCheckpointConnector();
   Config config = connector.validate(props);
   assertEquals(1, config.configValues().stream().filter(c -> 
c.name().equals(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).count());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16934: Clean up and refactor release.py [kafka]

2024-07-02 Thread via GitHub


jlprat commented on PR #16287:
URL: https://github.com/apache/kafka/pull/16287#issuecomment-2203894286

   I wanted to use this script to release 3.8 now that is finally unblocked.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Revert "KAFKA-16154" and mark MV 3.8-IV0 as the latest production [kafka]

2024-07-02 Thread via GitHub


jlprat commented on PR #16400:
URL: https://github.com/apache/kafka/pull/16400#issuecomment-2203893011

   Merged! Thanks for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Revert "KAFKA-16154" and mark MV 3.8-IV0 as the latest production [kafka]

2024-07-02 Thread via GitHub


jlprat merged PR #16400:
URL: https://github.com/apache/kafka/pull/16400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17064) New consumer assign should update assignment in background thread

2024-07-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17064:
--
Component/s: clients
 consumer

> New consumer assign should update assignment in background thread
> -
>
> Key: KAFKA-17064
> URL: https://issues.apache.org/jira/browse/KAFKA-17064
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> With the new async consumer, the subscriptionState object is shared between 
> the app thread and the background thread, but in principle all updates to the 
> assignment should happen in the background thread, to avoid race conditions. 
> Note that it's in the background where most updates to the assignment happen, 
> as result of app event processing like unsubscribe, reconciliations, etc.). 
> We've faced such races in places like unsubscribe and close, fixed by 
> ensuring that all assignment updates happen in the background, and this also 
> needs to be reviewed for the consumer.assign. The current implementation 
> triggers an AssignmentChange event that is processed in the background, but 
> that event is not really changing the assignment. It only commits offsets, 
> and the assignment is updated in the app thread by calling 
> subscriptionState.assignFromUser 
> We should consider moving the assignment update to the background thread, as 
> part of the AssignmentChangeEvent.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1662897722


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new 

Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]

2024-07-02 Thread via GitHub


C0urante commented on code in PR #16477:
URL: https://github.com/apache/kafka/pull/16477#discussion_r1662610208


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java:
##
@@ -32,15 +43,59 @@
 public class RootResource {
 
 private final Herder herder;
+private final RestRequestTimeout requestTimeout;
+private final Time time;
 
 @Inject
-public RootResource(Herder herder) {
+public RootResource(Herder herder, RestRequestTimeout requestTimeout) {
+this(herder, requestTimeout, Time.SYSTEM);
+}
+
+// For testing only
+RootResource(Herder herder, RestRequestTimeout requestTimeout, Time time) {
 this.herder = herder;
+this.requestTimeout = requestTimeout;
+this.time = time;
 }
 
 @GET
-@Operation(summary = "Get details about this Connect worker and the id of 
the Kafka cluster it is connected to")
+@Operation(summary = "Get details about this Connect worker and the ID of 
the Kafka cluster it is connected to")
 public ServerInfo serverInfo() {
 return new ServerInfo(herder.kafkaClusterId());
 }
+
+@GET
+@Path("/health")
+@Operation(summary = "Health check endpoint to verify worker readiness and 
liveness")
+public Response healthCheck() throws Throwable {
+WorkerStatus workerStatus;
+int statusCode;
+try {
+FutureCallback cb = new FutureCallback<>();
+herder.healthCheck(cb);
+
+long timeoutNs = 
TimeUnit.MILLISECONDS.toNanos(requestTimeout.healthCheckTimeoutMs());
+long deadlineNs = timeoutNs + time.nanoseconds();
+time.waitForFuture(cb, deadlineNs);
+
+statusCode = Response.Status.OK.getStatusCode();
+workerStatus = WorkerStatus.healthy();
+} catch (TimeoutException e) {
+Stage stage = e instanceof StagedTimeoutException
+? ((StagedTimeoutException) e).stage()
+: null;
+if (!herder.isReady()) {
+statusCode = 
Response.Status.SERVICE_UNAVAILABLE.getStatusCode();
+workerStatus = WorkerStatus.starting(stage);
+} else {
+statusCode = 
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode();
+workerStatus = WorkerStatus.unhealthy(stage);
+}
+} catch (ExecutionException e) {
+throw e.getCause();

Review Comment:
   The exception mapper will catch this anyways, won't it? The intent was to 
hit [this 
part](https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L73-L77)
 of the code path and generate the standard 500-on-unexpected-error response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on PR #16498:
URL: https://github.com/apache/kafka/pull/16498#issuecomment-2203203451

   @jeffkbkim @dongnuo123 @jolshan Thanks for your comments. I addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662541019


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -936,62 +941,90 @@ private void append(
 ));
 }
 
-// Compute the estimated size of the records.
-int estimatedSize = AbstractRecords.estimateSizeInBytes(
-currentBatch.builder.magic(),
-compression.type(),
-recordsToAppend
-);
+if (isAtomic) {

Review Comment:
   > To confirm my understanding, we take the original path when the append 
should be atomic, which means we verify whether all records can fit in the 
current batch. If not, we allocate a new batch then append the records. If the 
append does not need to be atomic, we append individual records until we can 
fit no more, then allocate a new batch.
   > 
   > This will reduce the number of writes to the log as we will have more 
filled batches on average. In practice, only the unload partition and cleanup 
group metadata jobs will have non-atomic writes today so we should not expect 
much impact.
   > 
   > Is this correct?
   
   Yes. This is correct. However, I did not do this change to improve the how 
batches are filled but rather to ensure that we would not be stuck in the case 
where we have a large number of records that could not fit in one batch.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -936,62 +941,90 @@ private void append(
 ));
 }
 
-// Compute the estimated size of the records.
-int estimatedSize = AbstractRecords.estimateSizeInBytes(
-currentBatch.builder.magic(),
-compression.type(),
-recordsToAppend
-);
+if (isAtomic) {

Review Comment:
   > One other thing I noticed is the ordering. For the atomic case, we create 
the batch first and then replay whereas non-atomic we do the replay then add to 
batch. Not sure if it makes a big difference though since we moved where we 
enqueue the event.
   
   This is a very good point and the code was wrong. I changed it to check if 
the record can fit in the batch before replaying. I added a test for this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16584 Make log processing summary configurable or debug [kafka]

2024-07-02 Thread via GitHub


dujian0068 closed pull request #16428: KAFKA-16584 Make log processing summary 
configurable or debug
URL: https://github.com/apache/kafka/pull/16428


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17050: Revert `group.version` [kafka]

2024-07-02 Thread via GitHub


dajac merged PR #16482:
URL: https://github.com/apache/kafka/pull/16482


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662545695


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -936,62 +941,90 @@ private void append(
 ));
 }
 
-// Compute the estimated size of the records.
-int estimatedSize = AbstractRecords.estimateSizeInBytes(
-currentBatch.builder.magic(),
-compression.type(),
-recordsToAppend
-);
+if (isAtomic) {
+// Compute the estimated size of the records.
+int estimatedSize = AbstractRecords.estimateSizeInBytes(
+currentBatch.builder.magic(),
+compression.type(),
+recordsToAppend
+);
 
-// Check if the current batch has enough space. We check is 
before
-// replaying the records in order to avoid having to revert 
back
-// changes if the records do not fit within a batch.
-if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
-throw new RecordTooLargeException("Message batch size is " 
+ estimatedSize +
-" bytes in append to partition " + tp + " which 
exceeds the maximum " +
-"configured size of " + currentBatch.maxBatchSize + 
".");
-}
+// Check if the current batch has enough space. We check 
is before
+// replaying the records in order to avoid having to 
revert back
+// changes if the records do not fit within a batch.
+if (estimatedSize > 
currentBatch.builder.maxAllowedBytes()) {
+throw new RecordTooLargeException("Message batch size 
is " + estimatedSize +
+" bytes in append to partition " + tp + " which 
exceeds the maximum " +
+"configured size of " + currentBatch.maxBatchSize 
+ ".");
+}
 
-if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
-// Otherwise, we write the current batch, allocate a new 
one and re-verify
-// whether the records fit in it.
-// If flushing fails, we don't catch the exception in 
order to let
-// the caller fail the current operation.
-flushCurrentBatch();
-maybeAllocateNewBatch(
-producerId,
-producerEpoch,
-verificationGuard,
-currentTimeMs
-);
+if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+// Otherwise, we write the current batch, allocate a 
new one and re-verify
+// whether the records fit in it.
+// If flushing fails, we don't catch the exception in 
order to let
+// the caller fail the current operation.
+flushCurrentBatch();
+maybeAllocateNewBatch(
+producerId,
+producerEpoch,
+verificationGuard,
+currentTimeMs
+);
+}
 }
 
-// Add the event to the list of pending events associated with 
the batch.
-currentBatch.deferredEvents.add(event);
-
 try {
-// Apply record to the state machine.
-if (replay) {
-for (int i = 0; i < records.size(); i++) {
-// We compute the offset of the record based on 
the last written offset. The
-// coordinator is the single writer to the 
underlying partition so we can
-// deduce it like this.
+for (int i = 0; i < records.size(); i++) {
+U recordToReplay = records.get(i);
+SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+if (replay) {
 coordinator.replay(
-currentBatch.nextOffset + i,
+currentBatch.nextOffset,
 producerId,
 producerEpoch,
-records.get(i)
+recordToReplay
 );
 }
-}
 
-// Append to the batch.
-for (SimpleRecord record : 

Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-07-02 Thread via GitHub


cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1660761368


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/FailedProcessingException.java:
##


Review Comment:
   What do you think of moving this exception to `streams.errors.internals`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Revert "KAFKA-16154" and mark MV 3.8-IV0 as the latest production [kafka]

2024-07-02 Thread via GitHub


jlprat commented on PR #16400:
URL: https://github.com/apache/kafka/pull/16400#issuecomment-2203252464

   Failing tests shouldn't be related to the changes (all tests passed locally)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on code in PR #16491:
URL: https://github.com/apache/kafka/pull/16491#discussion_r1661073537


##
docs/ops.html:
##
@@ -3971,7 +3971,10 @@ Enter Migration Mode on the Brokers
 
 # KRaft controller quorum configuration
 controller.quorum.voters=3000@localhost:9093
-controller.listener.names=CONTROLLER
+controller.listener.names=CONTROLLER
+
+# If the ZK broker is using a generated broker ID, disable broker ID generation
+broker.id.generation.enable=false

Review Comment:
   @showuon that is good question. maybe we can describe two solutions for 
users:
   
   1. `broker.id.generation.enable=false`. in migration, all brokers should 
have unique id already so it should be fine. 
   2. increase `reserved.broker.max.id`. users need to know the max node id in 
the cluster



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-02 Thread via GitHub


yazgoo commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1661056552


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -131,6 +131,27 @@ public void testTaskAssignmentWhenWorkerJoins() {
 assertEmptyAssignment();
 }
 
+@Test
+public void checkIndividualConnectorBalance() {
+connectors.clear();
+addNewConnector("connector1", 12);
+performStandardRebalance();
+addNewConnector("connector2", 12);
+performStandardRebalance();
+addNewEmptyWorkers("worker2");
+performStandardRebalance();
+performStandardRebalance();
+addNewEmptyWorkers("worker3");
+performStandardRebalance();
+performStandardRebalance();
+assertEquals(3, memberAssignments.size());
+memberAssignments.forEach((k, v) -> {

Review Comment:
   I refactored everything in `BalancedIterator` and added it to assignTasks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]

2024-07-02 Thread via GitHub


bbejeck commented on code in PR #16503:
URL: https://github.com/apache/kafka/pull/16503#discussion_r1662488787


##
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##


Review Comment:
   Agreed, I'm removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16584 Make log processing summary configurable or debug [kafka]

2024-07-02 Thread via GitHub


dujian0068 opened a new pull request, #16509:
URL: https://github.com/apache/kafka/pull/16509

   [KAFKA-16584 Make log processing summary configurable or 
debug](https://issues.apache.org/jira/browse/KAFKA-16584)
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]

2024-07-02 Thread via GitHub


lucasbru commented on PR #16503:
URL: https://github.com/apache/kafka/pull/16503#issuecomment-2203138366

   @bbejeck yes, aggred. This one is good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]

2024-07-02 Thread via GitHub


bbejeck commented on PR #16503:
URL: https://github.com/apache/kafka/pull/16503#issuecomment-2203129697

   > I wonder if we should set the config for all our integration tests to 
reduce the initial delay?
   @lucasbru - good idea, I'd prefer to do it in a follow up PR - WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on code in PR #16425:
URL: https://github.com/apache/kafka/pull/16425#discussion_r1661082490


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/SingleWriteMultiReadBenchmark.java:
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@State(Scope.Group)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class SingleWriteMultiReadBenchmark {
+private static final int TIMES = 100_000;
+
+@Param({"100"})
+private int mapSize;
+
+@Param({"0.1"})
+private double writePercentage;
+
+private Map concurrentHashMap;
+private Map copyOnWriteMap;
+
+private int writeTimes;
+private ImmutableMap pcollectionsImmutableMap;

Review Comment:
   it needs `volatile`, right? 



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/SingleWriteMultiReadBenchmark.java:
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+

Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


frankvicky commented on code in PR #16491:
URL: https://github.com/apache/kafka/pull/16491#discussion_r1661081534


##
docs/ops.html:
##
@@ -3971,7 +3971,10 @@ Enter Migration Mode on the Brokers
 
 # KRaft controller quorum configuration
 controller.quorum.voters=3000@localhost:9093
-controller.listener.names=CONTROLLER
+controller.listener.names=CONTROLLER
+
+# If the ZK broker is using a generated broker ID, disable broker ID generation
+broker.id.generation.enable=false

Review Comment:
   >If we don't add this line: broker.id.generation.enable=false, will it 
always fail?
   
   If  `broker.id.generation.enable=true` and the user does not set 
`reserved.broker.max.id` to a value greater than -1, it will definitely fail; 
conversely, `broker.id.generation.enable=false` will make 
`reserved.broker.max.id` ineffective, so it will definitely not fail.
   
   >If the user needs to enable broker.id.generation.enable in their cluster 
because it has been working for years, what could they do?
   
   It is recommended that the user adjusts `reserved.broker.max.id` to a 
sufficiently large value to facilitate future migrations.



##
docs/ops.html:
##
@@ -3971,7 +3971,10 @@ Enter Migration Mode on the Brokers
 
 # KRaft controller quorum configuration
 controller.quorum.voters=3000@localhost:9093
-controller.listener.names=CONTROLLER
+controller.listener.names=CONTROLLER
+
+# If the ZK broker is using a generated broker ID, disable broker ID generation
+broker.id.generation.enable=false

Review Comment:
   oh, as @chia7712 said, maybe the first question won't be a concern ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-02 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1661004357


##
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##
@@ -69,6 +70,13 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
 }
   }
 
+  private def validateGroupName(
+name: String): Unit = {

Review Comment:
   Done



##
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##
@@ -114,6 +122,22 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
 val properties = new Properties()
 config.forEach((key, value) => properties.setProperty(key, value))
 ClientMetricsConfigs.validate(resource.name(), properties)
+  case GROUP =>
+validateGroupName(resource.name())
+val properties = new Properties()
+val nullGroupConfigs = new mutable.ArrayBuffer[String]()
+config.entrySet().forEach(e => {

Review Comment:
   Done



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -615,7 +615,6 @@ class KafkaServer(
ConfigType.USER -> 
new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.BROKER 
-> new BrokerConfigHandler(config, quotaManagers),
ConfigType.IP -> 
new IpConfigHandler(socketServer.connectionQuotas))
-

Review Comment:
   Done



##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -479,7 +479,6 @@ class ZkAdminManager(val config: KafkaConfig,
 
 resource -> ApiError.NONE
   }
-

Review Comment:
   Done



##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -524,7 +523,6 @@ class ZkAdminManager(val config: KafkaConfig,
 val configProps = 
this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
 prepareIncrementalConfigs(alterConfigOps, configProps, 
KafkaConfig.configKeys)
 alterBrokerConfigs(resource, validateOnly, configProps, 
configEntriesMap)
-

Review Comment:
   Done



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_DOC
+= "The timeout to detect client failures when using the consumer group 
protocol.";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_DOC
+= "The heartbeat interval given to the members of a consumer group.";
+
+public static final int DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45 * 
1000;
+
+public static final int DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5 * 
1000;

Review Comment:
   Done



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.

Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]

2024-07-02 Thread via GitHub


frankvicky commented on PR #16491:
URL: https://github.com/apache/kafka/pull/16491#issuecomment-2200234845

   Hi @showuon and @chia7712 
   I have add some description for `broker.id.generation.enable=false`, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on PR #16425:
URL: https://github.com/apache/kafka/pull/16425#issuecomment-2200207294

   @TaiJuWu could you please add summary to description ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-07-02 Thread via GitHub


OmniaGM commented on PR #15968:
URL: https://github.com/apache/kafka/pull/15968#issuecomment-2200102266

   maybe @AndrewJSchofield or @artemlivshits can have a look into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-02 Thread via GitHub


DL1231 commented on PR #15067:
URL: https://github.com/apache/kafka/pull/15067#issuecomment-2200071353

   @dajac I've updated the PR. Please take a look again. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-02 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1661035915


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -285,6 +362,49 @@ Found problem:
   "Expected the default metadata.version to be 3.3-IV2")
   }
 
+  @Test
+  def testStandaloneModeWithArguments(): Unit = {
+val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAT",
+"-s"))
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))
+val exitCode = StorageTool.runFormatCommand(namespace, config)
+val tempDirs = config.logDirs
+tempDirs.foreach(tempDir => {
+  val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME
+  val checkpointFilePath = 
Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID)
+  assertTrue(checkpointFilePath.toFile.exists)
+  
assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost"))
+  Utils.delete(new File(tempDir))
+})
+assertEquals(0, exitCode)
+  }
+
+//  @Test TODO
+//  def testControllerQuorumVotersWithArguments(): Unit = {
+//val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yQQjhMQB6JAT",
+//  "--controller-quorum-voters", "1@localhost:9092"))
+//val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))

Review Comment:
   Not sure how to set controller listener



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17016: Align the behavior of GaugeWrapper and MeterWrapper [kafka]

2024-07-02 Thread via GitHub


FrankYang0529 commented on code in PR #16426:
URL: https://github.com/apache/kafka/pull/16426#discussion_r1661024169


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -284,26 +284,34 @@ class BrokerTopicMetrics(name: Option[String], 
remoteStorageEnabled: Boolean = f
   }
 
   case class GaugeWrapper(metricType: String) {
-@volatile private var gaugeObject: Gauge[Long] = _
-final private val gaugeLock = new Object
-final val aggregatedMetric = new AggregatedMetric()
+private final val removed = new AtomicBoolean(false)

Review Comment:
   Updated it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17051: ApiKeys#toHtml should exclude the APIs having unstable latest version [kafka]

2024-07-02 Thread via GitHub


m1a2st commented on code in PR #16480:
URL: https://github.com/apache/kafka/pull/16480#discussion_r1661004622


##
clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java:
##
@@ -87,4 +88,15 @@ public void testApiScope() {
 "Found some APIs missing scope definition");
 }
 
+@Test
+public void testHtmlOnlyHaveStableApi() {
+String html = ApiKeys.toHtml();
+for (ApiKeys apiKeys : ApiKeys.clientApis()) {
+if (apiKeys.messageType.latestVersionUnstable()) {
+assertFalse(html.contains(apiKeys.name), "Html should not 
contain unstable api: " + apiKeys.name);

Review Comment:
   Thanks for your suggestion, I updated the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-02 Thread via GitHub


muralibasani commented on PR #16325:
URL: https://github.com/apache/kafka/pull/16325#issuecomment-2200108407

   @jsancio , updated with controller option
   
   ```
   ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties -q 1@localhost:9093
   metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY})
   Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0.
   Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata
   
   
   muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh 
format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -s
 
   metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY})
   Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0.
   Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata
   
   
   muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh 
format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -q 
1@localhost:9093 -s
   Both --standalone and --controller-quorum-voters were set. Only one of the 
two flags can be set.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16791: Add thread detection to ClusterTestExtensions [kafka]

2024-07-02 Thread via GitHub


chia7712 commented on PR #16499:
URL: https://github.com/apache/kafka/pull/16499#issuecomment-2200088200

   Could you please have a individual class to implement the thread detection? 
that can have following benefits:
   
   1. easy to write test for thread detection (yes, the test needs test)
   2. we can apply it to other tests which don't use new test infra


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-07-02 Thread via GitHub


C0urante commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1659130813


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -106,6 +108,17 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map connectorConfigs) {
+List configValues = 
super.validate(connectorConfigs).configValues();
+new 
MirrorCheckpointConfig(connectorConfigs).validate().forEach(invalidConfig ->
+configValues.stream()
+.filter(conf -> 
conf.name().equals(invalidConfig.name()))
+.forEach(conf -> 
invalidConfig.errorMessages().forEach(msg -> conf.addErrorMessage(msg;

Review Comment:
   Holy Java 8 Batman!
   
   We don't need `forEach` here, can simplify a bit:
   ```suggestion
   .forEach(conf -> 
conf.errorMessages().addAll(invalidConfig.errorMessages(;
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -166,6 +173,29 @@ Duration consumerPollTimeout() {
 return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
 }
 
+public List validate() {
+List invalidConfigs = new ArrayList<>();
+
+if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && 
!this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) {
+Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED))
+.forEach(configValue -> {
+configValue.addErrorMessage("MirrorCheckpointConnector 
can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " +
+EMIT_CHECKPOINTS_ENABLED + " set to false");
+invalidConfigs.add(configValue);
+});
+}
+
+if 
("false".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true")))
 {

Review Comment:
   Why is `getBoolean` used above, but the `EMIT_OFFSET_SYNCS_ENABLED` property 
is manually read and parsed here?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -319,11 +397,14 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, 
String topic, int numMes
 cluster.produce(topic, Integer.toString(i));
 }
 }
-
 private void awaitMirrorMakerStart(final MirrorMaker mm, final 
SourceAndTarget sourceAndTarget) throws InterruptedException {
+awaitMirrorMakerStart(mm, sourceAndTarget, CONNECTOR_CLASSES);
+}
+
+private void awaitMirrorMakerStart(final MirrorMaker mm, final 
SourceAndTarget sourceAndTarget, final  List> connectorClasses) throws 
InterruptedException {

Review Comment:
   If we want to be fancy, we can use varargs here:
   ```suggestion
   private void awaitMirrorMakerStart(final MirrorMaker mm, final 
SourceAndTarget sourceAndTarget, final >... connectorClasses) throws 
InterruptedException {
   ```



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##
@@ -59,12 +59,13 @@ public void testMirrorCheckpointConnectorDisabled() {
 
 Set knownConsumerGroups = new HashSet<>();
 knownConsumerGroups.add(CONSUMER_GROUP);
+assertMirrorCheckpointConnectorDisabled(new 
MirrorCheckpointConnector(knownConsumerGroups, config));
+}
+
+private void 
assertMirrorCheckpointConnectorDisabled(MirrorCheckpointConnector connector) {

Review Comment:
   Is this change still necessary? Looks like it might have been left over from 
a previous draft?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -187,6 +194,77 @@ public void testSingleNodeCluster() throws Exception {
 }
 }
 
+@Test
+public void testClusterWithEmitOffsetDisabled() throws Exception {
+Properties brokerProps = new Properties();
+EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
+EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
+
+try (Admin adminB = clusterB.createAdminClient()) {
+
+// Cluster aliases
+final String a = "A";
+final String b = "B";
+final String ab = a + "->" + b;
+final String ba = b + "->" + a;

Review Comment:
   Currently unused. We can either remove it, or if we want to be more explicit 
in our properties setup, we can use it to explicitly disable the b->a flow by 
setting `ba + ".enabled"` to false.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -166,6 +173,29 @@ Duration consumerPollTimeout() {
 

[PR] (WIP DO NOT MERGE) KAFKA-16502: Fix flaky EOSUncleanShutdownIntegrationTest [kafka]

2024-07-02 Thread via GitHub


mjsax opened a new pull request, #16490:
URL: https://github.com/apache/kafka/pull/16490

   I could not find a root cause, but the test times out waiting KS to go into 
ERROR state inside the finally block.
   
   So it seem we might swallow some error? Trying to add some more logging. The 
logs I have access too, do not contain any errors...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >