Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663537165 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -730,7 +922,20 @@ private void assertAssignment( assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size()); for (String memberId : computedGroupAssignment.members().keySet()) { Map> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).partitions(); -assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); +Map> expectedAssignmentForMember = expectedAssignment.get(memberId); + +// Compare the content of the maps by entries. +assertEquals(expectedAssignmentForMember.size(), computedAssignmentForMember.size()); +for (Map.Entry> entry : expectedAssignmentForMember.entrySet()) { +Set expectedSet = entry.getValue(); +Set computedSet = computedAssignmentForMember.get(entry.getKey()); + +// Convert both sets to HashSet for comparison. +Set normalizedExpectedSet = new HashSet<>(expectedSet); +Set normalizedComputedSet = new HashSet<>(computedSet); + +assertEquals(normalizedExpectedSet, normalizedComputedSet); +} Review Comment: yeah, I think it didn't work initially, but changed it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17064) New consumer assign should update assignment in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PoAn Yang reassigned KAFKA-17064: - Assignee: PoAn Yang > New consumer assign should update assignment in background thread > - > > Key: KAFKA-17064 > URL: https://issues.apache.org/jira/browse/KAFKA-17064 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Assignee: PoAn Yang >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > > With the new async consumer, the subscriptionState object is shared between > the app thread and the background thread, but in principle all updates to the > assignment should happen in the background thread, to avoid race conditions. > Note that it's in the background where most updates to the assignment happen, > as result of app event processing like unsubscribe, reconciliations, etc.). > We've faced such races in places like unsubscribe and close, fixed by > ensuring that all assignment updates happen in the background, and this also > needs to be reviewed for the consumer.assign. The current implementation > triggers an AssignmentChange event that is processed in the background, but > that event is not really changing the assignment. It only commits offsets, > and the assignment is updated in the app thread by calling > subscriptionState.assignFromUser > We should consider moving the assignment update to the background thread, as > part of the AssignmentChangeEvent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
yashmayya commented on code in PR #15302: URL: https://github.com/apache/kafka/pull/15302#discussion_r1663466038 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -976,27 +988,53 @@ private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets offsetsToAlt * * @param connectorName the name of the sink connector whose offsets are to be verified * @param expectedTopic the name of the Kafka topic that the sink connector is consuming from - * @param expectedPartitions the number of partitions that exist for the Kafka topic + * @param expectedPartitions the number of partitions that exist for the Kafka<<< topic Review Comment: Stray change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16918: TestUtils#assertFutureThrows should use future.get with timeout [kafka]
showuon commented on code in PR #16264: URL: https://github.com/apache/kafka/pull/16264#discussion_r1663413976 ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -558,14 +558,13 @@ public static Set generateRandomTopicPartitions(int numTopic, in */ public static T assertFutureThrows(Future future, Class exceptionCauseClass) { try { -future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS); +future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.MILLISECONDS); fail("Future should throw expected exception " + exceptionCauseClass.getSimpleName() + " but succeed."); } catch (TimeoutException | InterruptedException | ExecutionException e) { -assertInstanceOf(exceptionCauseClass, e.getCause(), -"Unexpected exception cause " + e.getCause()); +assertInstanceOf(exceptionCauseClass, e.getCause(), "Expected a" + exceptionCauseClass.getSimpleName() + "but got" + e.getCause()); return exceptionCauseClass.cast(e.getCause()); } -return null; +throw new RuntimeException("Future should throw expected exception but unexpected error happened."); Review Comment: I saw you addressed 2 of my 3 comments: > 1. DEFAULT_MAX_WAIT_MS is a millisecond time unit, not seconds. Same comment applied to L583. It's fixed now. Thanks. > 2. Now, if the future throws TimeoutException or InterruptedException, it will fail as expected. But what error message will we get? In your opinion, do you think that's helpful for troubleshooting? Could you first let me know what error message we will get when `TimeoutException or InterruptedException` thrown? > 3. What happened if the future throws CancellationException? return null? Is there any better way to handle it? Now, it will throw RuntimeException, which works, too. But I think we can directly `fail` the test, like L 562 did. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: I'm not sure we would ever be able to achieve this path we're mocking here with a real consumer, so wondering if this is even a valid scenario to test? (this test, and the ones below for fenced and stale too). A member in fatal/fenced/stale state means it was part of a group (automatic assignment after subscribe), and then we're mocking a manual assignment that takes effect and is cleared on leaveGroup. I expect that would never be possible because the consumer does not allow to mix the subscription types, so the call to assign would fail before actually updating the assignment and numAssignedPartitions (a consumer would have to unsubscribe or assign with empty to then be able to manually assign the topic0 partition) Just for the record, I totally see the value in the `testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and it's a valid combination since it's not mixing subscription types, but I struggle to see this call to assign happening when a member is fatal/fenced/stale? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on PR #16449: URL: https://github.com/apache/kafka/pull/16449#issuecomment-2204975848 Hey @FrankYang0529, thanks a lot for the fix! Left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1663411634 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -2077,6 +2077,19 @@ void testReaperInvokedInPoll() { verify(backgroundEventReaper).reap(time.milliseconds()); } +@Test +public void testUnsubscribeWithoutGroupId() { +consumer = newConsumerWithoutGroupId(); +completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Review Comment: do we need this? I wouldn't expect we have to mock anything related to committed offsets or fetching because we're not polling in the test (or using committed offsets in any way) right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Remote fetch throttle metrics [kafka]
showuon commented on PR #16087: URL: https://github.com/apache/kafka/pull/16087#issuecomment-2204973870 @abhijeetk88 , there are format error. Please help fix it. https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16087/7/pipeline/10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]
lianetm commented on code in PR #16449: URL: https://github.com/apache/kafka/pull/16449#discussion_r1663408286 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2302,6 +2303,61 @@ public void testRebalanceMetricsOnFailedRebalance() { assertEquals(-1d, getMetricValue(metrics, rebalanceMetricsManager.lastRebalanceSecondsAgo)); } +@Test +public void testLeaveGroupWhenStateIsFatal() { +MembershipManagerImpl membershipManager = createMemberInStableState(null); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); +membershipManager.transitionToFatal(); +assertEquals(MemberState.FATAL, membershipManager.state()); + +subscriptionState.assignFromUser(Collections.singleton(new TopicPartition("topic", 0))); +assertEquals(1, subscriptionState.numAssignedPartitions()); Review Comment: I'm not sure we would ever be able to achieve this path we're mocking here with a real consumer, so wondering if this is even a valid scenario to test? (this test, and the ones below for fenced and stale too). A member in fatal/fenced/stale state means it was part of a group (automatic assignment after subscribe), and then we're mocking a manual assignment and takes effect and is cleared on leaveGroup. I expect that would never be possible because the consumer does not allow to mix the subscription types, so the call to assign would fail before actually updating the assignment and numAssignedPartitions (a consumer would have to unsubscribe or assign with empty to then be able to manually assign the topic0 partition) Just for the record, I totally see the value in the `testLeaveGroupWhenStateIsUnsubscribe`, which is covering the gap we had and it's a valid combination since it's not mixing subscription types, but I struggle to see this call to assign happening when a member is fatal/fenced/stale? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
frankvicky commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204956965 > @frankvicky , any thoughts about it? I'd also like to hear since you are the author of this PR. Hi @showuon ! The reason why broker.id.generation.enable might need to be disabled has been more precisely explained by @chia7712 compared to my previous explanation. Regarding the second point from @chia7712 , I'm thinking the potential drawbacks of allowing node.id to be mutable. IMHO, could these changes lead to inconsistencies in cluster coordination and communication? Might they also cause any unpredictable behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
chia7712 commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204940529 > If my understanding is correct, the possible error is the broker id will be ignored and the generated ID might exceed the range of max id, which cause the error reported in JIRA. Is that right? Yep, ignoring the broker id can pass the reserved ids check but it fails on the default node.id. Explicitly defining the "broker.id=generated id" can update node.id to run Kraft components in zk broker but it fails on reserved ids check. > We should take the generated ID into node.id. Yes. Specifically, We can define node.id too in updating broker.id manually no matter how we get the broker.id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17064) New consumer assign should update assignment in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862635#comment-17862635 ] Lianet Magrans commented on KAFKA-17064: Sure, you can take it, thanks for helping out! Let me know if you have any other questions, and ping me when there is a PR, happy to help with the review. > New consumer assign should update assignment in background thread > - > > Key: KAFKA-17064 > URL: https://issues.apache.org/jira/browse/KAFKA-17064 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > > With the new async consumer, the subscriptionState object is shared between > the app thread and the background thread, but in principle all updates to the > assignment should happen in the background thread, to avoid race conditions. > Note that it's in the background where most updates to the assignment happen, > as result of app event processing like unsubscribe, reconciliations, etc.). > We've faced such races in places like unsubscribe and close, fixed by > ensuring that all assignment updates happen in the background, and this also > needs to be reviewed for the consumer.assign. The current implementation > triggers an AssignmentChange event that is processed in the background, but > that event is not really changing the assignment. It only commits offsets, > and the assignment is updated in the app thread by calling > subscriptionState.assignFromUser > We should consider moving the assignment update to the background thread, as > part of the AssignmentChangeEvent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API
[ https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lin Siyuan updated KAFKA-17041: --- Attachment: image-2024-07-03-10-22-20-432.png > Add pagination when describe large set of metadata via Admin API > - > > Key: KAFKA-17041 > URL: https://issues.apache.org/jira/browse/KAFKA-17041 > Project: Kafka > Issue Type: Task > Components: admin >Reporter: Omnia Ibrahim >Priority: Major > Attachments: image-2024-07-03-10-21-15-315.png, > image-2024-07-03-10-22-20-432.png > > > Some of the request via Admin API timeout on large cluster or cluster with > large set of specific metadata. For example OffsetFetchRequest and > DescribeLogDirsRequest timeout due to large number of partition on cluster. > Also DescribeProducersRequest and ListTransactionsRequest time out due to too > many short lived PID or too many hanging transactions > [KIP-1062: Introduce Pagination for some requests used by Admin > API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API
[ https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862634#comment-17862634 ] Lin Siyuan commented on KAFKA-17041: !image-2024-07-03-10-22-20-432.png! Hello [~omnia_h_ibrahim] , there is a small word mistake here > Add pagination when describe large set of metadata via Admin API > - > > Key: KAFKA-17041 > URL: https://issues.apache.org/jira/browse/KAFKA-17041 > Project: Kafka > Issue Type: Task > Components: admin >Reporter: Omnia Ibrahim >Priority: Major > Attachments: image-2024-07-03-10-21-15-315.png, > image-2024-07-03-10-22-20-432.png > > > Some of the request via Admin API timeout on large cluster or cluster with > large set of specific metadata. For example OffsetFetchRequest and > DescribeLogDirsRequest timeout due to large number of partition on cluster. > Also DescribeProducersRequest and ListTransactionsRequest time out due to too > many short lived PID or too many hanging transactions > [KIP-1062: Introduce Pagination for some requests used by Admin > API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17041) Add pagination when describe large set of metadata via Admin API
[ https://issues.apache.org/jira/browse/KAFKA-17041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lin Siyuan updated KAFKA-17041: --- Attachment: image-2024-07-03-10-21-15-315.png > Add pagination when describe large set of metadata via Admin API > - > > Key: KAFKA-17041 > URL: https://issues.apache.org/jira/browse/KAFKA-17041 > Project: Kafka > Issue Type: Task > Components: admin >Reporter: Omnia Ibrahim >Priority: Major > Attachments: image-2024-07-03-10-21-15-315.png > > > Some of the request via Admin API timeout on large cluster or cluster with > large set of specific metadata. For example OffsetFetchRequest and > DescribeLogDirsRequest timeout due to large number of partition on cluster. > Also DescribeProducersRequest and ListTransactionsRequest time out due to too > many short lived PID or too many hanging transactions > [KIP-1062: Introduce Pagination for some requests used by Admin > API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1062%3A+Introduce+Pagination+for+some+requests+used+by+Admin+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-15524: - Assignee: (was: Chris Egerton) > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks > -- > > Key: KAFKA-15524 > URL: https://issues.apache.org/jira/browse/KAFKA-15524 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.6.0, 3.5.1 >Reporter: Josep Prat >Priority: Major > Labels: flaky, flaky-test > > Last seen: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/] > > h3. Error Message > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out.{code} > h3. Stacktrace > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at >
[jira] [Reopened] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reopened KAFKA-15524: --- > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks > -- > > Key: KAFKA-15524 > URL: https://issues.apache.org/jira/browse/KAFKA-15524 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.6.0, 3.5.1 >Reporter: Josep Prat >Priority: Major > Labels: flaky, flaky-test > > Last seen: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/] > > h3. Error Message > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out.{code} > h3. Stacktrace > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at >
[jira] [Resolved] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
[ https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-15524. --- Resolution: Later > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks > -- > > Key: KAFKA-15524 > URL: https://issues.apache.org/jira/browse/KAFKA-15524 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.6.0, 3.5.1 >Reporter: Josep Prat >Priority: Major > Labels: flaky, flaky-test > > Last seen: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/] > > h3. Error Message > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out.{code} > h3. Stacktrace > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at >
Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
C0urante commented on code in PR #15302: URL: https://github.com/apache/kafka/pull/15302#discussion_r1663319861 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -801,6 +806,11 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1, "Connector tasks did not start in time."); +// Make sure the tasks' consumers have had a chance to actually form a group +// (otherwise, the reset request will succeed because there won't be any active consumers) +verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, NUM_RECORDS_PER_PARTITION, Review Comment: Great point, I've removed mention of KAFKA-15524 from this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lin Siyuan updated KAFKA-17066: --- Description: The updateFetchPositions func in the new consumer performs several actions based on the assigned partitions from the subscriptionState. The way it's currently implemented, it fetches committed offsets for partitions that required a position (retrieved from subscription state in the app thread), and then resets positions for the partitions still needing one (retrieved from the subscription state but in the backgroud thread). This is problematic, given that the assignment/subscriptionState may change in the background thread at any time (ex. new partitions reconciled), so we could end up resetting positions to the partition offsets for a partition for which we never evetn attempted to retrieve committed offsets. This sequence for a consumer that owns a partitions tp0,: * consumer owns tp0 * app thread -> updateFetchPositions triggers initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned partitions requiring a position (taking them from subscriptions.initializingPartitions()). This will fetch committed offsets for tp0 only. * background thread -> receives new partition tp1 and completes reconciliation (adds it to the subscription state as INITIALIZING, requires a position) * app thread -> updateFetchPositions resets positions for all partitions that still don't have a valid position after initWithCommittedOffsetsIfNeeded (taking them from subscriptionState.partitionsNeedingReset). This will mistakenly consider that it should reset tp1 to the partition offsets, when in reality it never even tried fetching the committed offsets for it because it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. We should consider moving the updateFetchPositions as a single event to the background, that would safely use the subscriptionState object and apply all actions involved in the updateFetchPositions to the same consistent set of partitions assigned at that moment. was: {color:red}着色文本{color}The updateFetchPositions func in the new consumer performs several actions based on the assigned partitions from the subscriptionState. The way it's currently implemented, it fetches committed offsets for partitions that required a position (retrieved from subscription state in the app thread), and then resets positions for the partitions still needing one (retrieved from the subscription state but in the backgroud thread). This is problematic, given that the assignment/subscriptionState may change in the background thread at any time (ex. new partitions reconciled), so we could end up resetting positions to the partition offsets for a partition for which we never evetn attempted to retrieve committed offsets. This sequence for a consumer that owns a partitions tp0,: * consumer owns tp0 * app thread -> updateFetchPositions triggers initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned partitions requiring a position (taking them from subscriptions.initializingPartitions()). This will fetch committed offsets for tp0 only. * background thread -> receives new partition tp1 and completes reconciliation (adds it to the subscription state as INITIALIZING, requires a position) * app thread -> updateFetchPositions resets positions for all partitions that still don't have a valid position after initWithCommittedOffsetsIfNeeded (taking them from subscriptionState.partitionsNeedingReset). This will mistakenly consider that it should reset tp1 to the partition offsets, when in reality it never even tried fetching the committed offsets for it because it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. We should consider moving the updateFetchPositions as a single event to the background, that would safely use the subscriptionState object and apply all actions involved in the updateFetchPositions to the same consistent set of partitions assigned at that moment. > New consumer updateFetchPositions should perform all operations in background > thread > > > Key: KAFKA-17066 > URL: https://issues.apache.org/jira/browse/KAFKA-17066 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Blocker > Fix For: 3.9.0 > > > The updateFetchPositions func in the new consumer performs several actions > based on the assigned partitions from the subscriptionState. The way it's > currently implemented, it fetches committed offsets for partitions that > required a position (retrieved from subscription state in the app thread), > and then resets positions for the
[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lin Siyuan updated KAFKA-17066: --- Description: {color:red}着色文本{color}The updateFetchPositions func in the new consumer performs several actions based on the assigned partitions from the subscriptionState. The way it's currently implemented, it fetches committed offsets for partitions that required a position (retrieved from subscription state in the app thread), and then resets positions for the partitions still needing one (retrieved from the subscription state but in the backgroud thread). This is problematic, given that the assignment/subscriptionState may change in the background thread at any time (ex. new partitions reconciled), so we could end up resetting positions to the partition offsets for a partition for which we never evetn attempted to retrieve committed offsets. This sequence for a consumer that owns a partitions tp0,: * consumer owns tp0 * app thread -> updateFetchPositions triggers initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned partitions requiring a position (taking them from subscriptions.initializingPartitions()). This will fetch committed offsets for tp0 only. * background thread -> receives new partition tp1 and completes reconciliation (adds it to the subscription state as INITIALIZING, requires a position) * app thread -> updateFetchPositions resets positions for all partitions that still don't have a valid position after initWithCommittedOffsetsIfNeeded (taking them from subscriptionState.partitionsNeedingReset). This will mistakenly consider that it should reset tp1 to the partition offsets, when in reality it never even tried fetching the committed offsets for it because it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. We should consider moving the updateFetchPositions as a single event to the background, that would safely use the subscriptionState object and apply all actions involved in the updateFetchPositions to the same consistent set of partitions assigned at that moment. was: The updateFetchPositions func in the new consumer performs several actions based on the assigned partitions from the subscriptionState. The way it's currently implemented, it fetches committed offsets for partitions that required a position (retrieved from subscription state in the app thread), and then resets positions for the partitions still needing one (retrieved from the subscription state but in the backgroud thread). This is problematic, given that the assignment/subscriptionState may change in the background thread at any time (ex. new partitions reconciled), so we could end up resetting positions to the partition offsets for a partition for which we never evetn attempted to retrieve committed offsets. This sequence for a consumer that owns a partitions tp0,: * consumer owns tp0 * app thread -> updateFetchPositions triggers initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned partitions requiring a position (taking them from subscriptions.initializingPartitions()). This will fetch committed offsets for tp0 only. * background thread -> receives new partition tp1 and completes reconciliation (adds it to the subscription state as INITIALIZING, requires a position) * app thread -> updateFetchPositions resets positions for all partitions that still don't have a valid position after initWithCommittedOffsetsIfNeeded (taking them from subscriptionState.partitionsNeedingReset). This will mistakenly consider that it should reset tp1 to the partition offsets, when in reality it never even tried fetching the committed offsets for it because it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. We should consider moving the updateFetchPositions as a single event to the background, that would safely use the subscriptionState object and apply all actions involved in the updateFetchPositions to the same consistent set of partitions assigned at that moment. > New consumer updateFetchPositions should perform all operations in background > thread > > > Key: KAFKA-17066 > URL: https://issues.apache.org/jira/browse/KAFKA-17066 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Blocker > Fix For: 3.9.0 > > > {color:red}着色文本{color}The updateFetchPositions func in the new consumer > performs several actions based on the assigned partitions from the > subscriptionState. The way it's currently implemented, it fetches committed > offsets for partitions that required a position (retrieved from subscription > state in the app thread), and then
Re: [PR] KAFKA-16529; Implement raft response handling [kafka]
jsancio commented on code in PR #16454: URL: https://github.com/apache/kafka/pull/16454#discussion_r1663345822 ## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ## @@ -86,10 +87,10 @@ protected CandidateState( this.backoffTimer = time.timer(0); this.log = logContext.logger(CandidateState.class); -for (Integer voterId : voters.voterIds()) { -voteStates.put(voterId, State.UNRECORDED); +for (ReplicaKey voter : voters.voterKeys()) { +voteStates.put(voter.id(), new VoterState(voter)); } -voteStates.put(localId, State.GRANTED); +voteStates.get(localId).setState(State.GRANTED); Review Comment: The invariant is that only voters can transition to `CandidateState` so that means that `voteState` will always contain the local id. I added this check to verify that: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/CandidateState.java#L67-L76 Having said that I was looking at some of the transitions and I see that we don't satisfy that invariant in all cases. I filed this jira for me to revisit this: [Fix the transition to CandidateState](https://issues.apache.org/jira/browse/KAFKA-17067). I probably should work on this Jira before we merge the AddRaftVoter RPC PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17067) Fix the transition to CandidateState
José Armando García Sancio created KAFKA-17067: -- Summary: Fix the transition to CandidateState Key: KAFKA-17067 URL: https://issues.apache.org/jira/browse/KAFKA-17067 Project: Kafka Issue Type: Sub-task Reporter: José Armando García Sancio Assignee: José Armando García Sancio Fix For: 3.9.0 Only voters should be able to transition to CandidateState. The current code allows VotedState to transition to CandidateState. Not all VotedState are voters. This was relaxed as part of KIP-853. No-voters are allowed to vote for leaders even though they locally think they are not voters. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16529; Implement raft response handling [kafka]
jsancio commented on code in PR #16454: URL: https://github.com/apache/kafka/pull/16454#discussion_r1663336934 ## raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java: ## @@ -58,193 +60,229 @@ private CandidateState newCandidateState(VoterSet voters) { ); } -@Test -public void testSingleNodeQuorum() { -CandidateState state = newCandidateState(voterSetWithLocal(IntStream.empty())); +@ParameterizedTest +@ValueSource(booleans = { true, false }) Review Comment: Looks like Kafka's checkstyle accepts both (`{ true, false }` and `{true, false}`). If we want to change this, do you mind if we do this in another PR? I use this style in a lot of places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663330737 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A {@code RangeSet} represents a range of integers from {@code from} (inclusive) + * to {@code to} (exclusive). + * This implementation provides a view over a continuous range of integers without actually storing them. + */ +public class RangeSet implements Set { +private final int from; +private final int to; + +/** + * Constructs a {@code RangeSet} with the specified range. + * + * @param from The starting value (inclusive) of the range. + * @param toThe ending value (exclusive) of the range. + */ +public RangeSet(int from, int to) { +this.from = from; +this.to = to; +} + +@Override +public int size() { +return to - from; +} + +@Override +public boolean isEmpty() { +return size() == 0; +} + +@Override +public boolean contains(Object o) { +if (o instanceof Integer) { +int value = (Integer) o; +return value >= from && value < to; +} +return false; +} + +@Override +public Iterator iterator() { +return new Iterator() { +private int current = from; + +@Override +public boolean hasNext() { +return current < to; +} + +@Override +public Integer next() { +if (!hasNext()) throw new NoSuchElementException(); +return current++; +} +}; +} + +@Override +public Object[] toArray() { +throw new UnsupportedOperationException(); +} + +@Override +public T[] toArray(T[] a) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean add(Integer integer) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean remove(Object o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean containsAll(Collection c) { +for (Object o : c) { +if (!contains(o)) return false; +} +return true; +} + +@Override +public boolean addAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean retainAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean removeAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public void clear() { +throw new UnsupportedOperationException(); +} + +@Override +public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = from; i < to; i++) { +sb.append(i); +if (i < to - 1) { +sb.append(", "); +} +} +sb.append("]"); +return sb.toString(); Review Comment: Okie I changed it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16529; Implement raft response handling [kafka]
jsancio commented on code in PR #16454: URL: https://github.com/apache/kafka/pull/16454#discussion_r1663328497 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1895,12 +2021,14 @@ private Optional maybeHandleCommonResponse( } } else if (error == Errors.BROKER_NOT_AVAILABLE) { return Optional.of(false); -} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL) { -// For now we treat this as a fatal error. Once we have support for quorum -// reassignment, this error could suggest that either we or the recipient of -// the request just has stale voter information, which means we can retry -// after backing off. -throw new IllegalStateException("Received error indicating inconsistent voter sets"); +} else if (error == Errors.INVALID_VOTER_KEY) { +// The voter key in the request for VOTE and BEGIN_QUORUM_EPOCH doesn't match the +// receiver's replica key +logger.info( Review Comment: I added the node information of the receiver (sender of the response). At this point in the code we don't have easy access to the voter key (`ReplicaKey`) sent in the request. To get access to it I would have to extend the response wrapper (`RaftResponse.Inbound`) to include the original request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16529; Implement raft response handling [kafka]
jsancio commented on code in PR #16454: URL: https://github.com/apache/kafka/pull/16454#discussion_r1663323596 ## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ## @@ -147,14 +148,18 @@ public boolean isVoteRejected() { * rejected by this node */ public boolean recordGrantedVote(int remoteNodeId) { -State state = voteStates.get(remoteNodeId); -if (state == null) { +VoterState voterState = voteStates.get(remoteNodeId); +if (voterState == null) { throw new IllegalArgumentException("Attempt to grant vote to non-voter " + remoteNodeId); -} else if (state == State.REJECTED) { +} else if (voterState.state() == State.REJECTED) { Review Comment: Fixed. Limited the changes to only this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
C0urante commented on PR #15302: URL: https://github.com/apache/kafka/pull/15302#issuecomment-2204763797 @gharris1727 sorry for the delay. You're correct that this PR did not actually address KAFKA-15224, and I'm honestly not sure how I missed that the first time around. I've updated the description to only mention KAFKA-15917, added some useful info to assertion messages when sink connectors fail to catch up to the expected committed offsets in these tests, and rebased on the latest trunk. Would you mind giving this another pass when you have a moment? Currently this test is the flakiest of all Connect integration tests, failing nearly 10% of the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
C0urante commented on code in PR #15302: URL: https://github.com/apache/kafka/pull/15302#discussion_r1663319861 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -801,6 +806,11 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1, "Connector tasks did not start in time."); +// Make sure the tasks' consumers have had a chance to actually form a group +// (otherwise, the reset request will succeed because there won't be any active consumers) +verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, NUM_RECORDS_PER_PARTITION, Review Comment: Great point, I've removed mention of KAFKA-15224 from this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17064) New consumer assign should update assignment in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862619#comment-17862619 ] PoAn Yang commented on KAFKA-17064: --- Hi [~lianetm], thanks for your explanation. If you're not working on this, may I assign the issue to myself? I can handle it after [https://github.com/apache/kafka/pull/16449] is merged. Thank you. > New consumer assign should update assignment in background thread > - > > Key: KAFKA-17064 > URL: https://issues.apache.org/jira/browse/KAFKA-17064 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > > With the new async consumer, the subscriptionState object is shared between > the app thread and the background thread, but in principle all updates to the > assignment should happen in the background thread, to avoid race conditions. > Note that it's in the background where most updates to the assignment happen, > as result of app event processing like unsubscribe, reconciliations, etc.). > We've faced such races in places like unsubscribe and close, fixed by > ensuring that all assignment updates happen in the background, and this also > needs to be reviewed for the consumer.assign. The current implementation > triggers an AssignmentChange event that is processed in the background, but > that event is not really changing the assignment. It only commits offsets, > and the assignment is updated in the app thread by calling > subscriptionState.assignFromUser > We should consider moving the assignment update to the background thread, as > part of the AssignmentChangeEvent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
showuon commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204751198 @frankvicky , any thoughts about it? I'd also like to hear since you are the author of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
showuon commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204748082 > 1. update docs to explain the possible error when you define the broker id for the zk broker which is using broker.id.generation.enable If my understanding is correct, the possible error is the broker id will be ignored and the generated ID might exceed the range of max id, which cause the error reported in JIRA. Is that right? > 2. in order to minimize the changes, we can update both broker.id and node.id together. The side effect is KafkaConfig#nodeId will be a mutable variable. IMO, this should be a bug. We should take the generated ID into node.id. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663307155 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663277435 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A {@code RangeSet} represents a range of integers from {@code from} (inclusive) + * to {@code to} (exclusive). + * This implementation provides a view over a continuous range of integers without actually storing them. + */ +public class RangeSet implements Set { +private final int from; +private final int to; + +/** + * Constructs a {@code RangeSet} with the specified range. + * + * @param from The starting value (inclusive) of the range. + * @param toThe ending value (exclusive) of the range. + */ +public RangeSet(int from, int to) { +this.from = from; +this.to = to; +} + +@Override +public int size() { +return to - from; +} + +@Override +public boolean isEmpty() { +return size() == 0; +} + +@Override +public boolean contains(Object o) { +if (o instanceof Integer) { +int value = (Integer) o; +return value >= from && value < to; +} +return false; +} + +@Override +public Iterator iterator() { +return new Iterator() { +private int current = from; + +@Override +public boolean hasNext() { +return current < to; +} + +@Override +public Integer next() { +if (!hasNext()) throw new NoSuchElementException(); +return current++; +} +}; +} + +@Override +public Object[] toArray() { +throw new UnsupportedOperationException(); +} + +@Override +public T[] toArray(T[] a) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean add(Integer integer) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean remove(Object o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean containsAll(Collection c) { +for (Object o : c) { +if (!contains(o)) return false; +} +return true; +} + +@Override +public boolean addAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean retainAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean removeAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public void clear() { +throw new UnsupportedOperationException(); +} + +@Override +public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = from; i < to; i++) { +sb.append(i); +if (i < to - 1) { +sb.append(", "); +} +} +sb.append("]"); +return sb.toString(); +} + +/** + * Compares the specified object with this set for equality. Returns {@code true} + * if the specified object is also a set, the two sets have the same size, and + * every member of the specified set is contained in this set. + * + * @param o object to be compared for equality with this set + * @return {@code true} if the specified object is equal to this set + */ +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (!(o instanceof Set)) return false; + +Set otherSet = (Set) o; +if (otherSet.size() != this.size()) return false; + +for (int i = from; i < to; i++) { +if (!otherSet.contains(i)) return false; +} Review Comment: yess makes sense -- This is an automated
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663251568 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663248450 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new
Re: [PR] MINOR: Automatically await startup and cluster warmup in Connect EmbeddedKafkaCluster::start [kafka]
C0urante commented on PR #16327: URL: https://github.com/apache/kafka/pull/16327#issuecomment-2204593040 This doesn't seem to have had a noticeable effect on test flakiness, going to abandon it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Automatically await startup and cluster warmup in Connect EmbeddedKafkaCluster::start [kafka]
C0urante closed pull request #16327: MINOR: Automatically await startup and cluster warmup in Connect EmbeddedKafkaCluster::start URL: https://github.com/apache/kafka/pull/16327 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16986) After upgrading to Kafka 3.4.1, the producer constantly produces logs related to topicId changes
[ https://issues.apache.org/jira/browse/KAFKA-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862607#comment-17862607 ] Justine Olshan commented on KAFKA-16986: I guess you may see this as we expire metadata after metadata.max.idle.ms (= 30). I wonder if that's what is happening. > After upgrading to Kafka 3.4.1, the producer constantly produces logs related > to topicId changes > > > Key: KAFKA-16986 > URL: https://issues.apache.org/jira/browse/KAFKA-16986 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.0.1, 3.6.1 >Reporter: Vinicius Vieira dos Santos >Priority: Minor > Attachments: image-2024-07-01-09-05-17-147.png, image.png > > > When updating the Kafka broker from version 2.7.0 to 3.4.1, we noticed that > the applications began to log the message "{*}Resetting the last seen epoch > of partition PAYMENTS-0 to 0 since the associated topicId changed from null > to szRLmiAiTs8Y0nI8b3Wz1Q{*}" in a very constant, from what I understand this > behavior is not expected because the topic was not deleted and recreated so > it should simply use the cached data and not go through this client log line. > We have some applications with around 15 topics and 40 partitions which means > around 600 log lines when metadata updates occur > The main thing for me is to know if this could indicate a problem or if I can > simply change the log level of the org.apache.kafka.clients.Metadata class to > warn without worries > > There are other reports of the same behavior like this: > [https://stackoverflow.com/questions/74652231/apache-kafka-resetting-the-last-seen-epoch-of-partition-why] > > *Some log occurrences over an interval of about 7 hours, each block refers to > an instance of the application in kubernetes* > > !image.png! > *My scenario:* > *Application:* > - Java: 21 > - Client: 3.6.1, also tested on 3.0.1 and has the same behavior > *Broker:* > - Cluster running on Kubernetes with the bitnami/kafka:3.4.1-debian-11-r52 > image > > *Producer Config* > > acks = -1 > auto.include.jmx.reporter = true > batch.size = 16384 > bootstrap.servers = [server:9092] > buffer.memory = 33554432 > client.dns.lookup = use_all_dns_ips > client.id = producer-1 > compression.type = gzip > connections.max.idle.ms = 54 > delivery.timeout.ms = 3 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 0 > max.block.ms = 6 > max.in.flight.requests.per.connection = 1 > max.request.size = 1048576 > metadata.max.age.ms = 30 > metadata.max.idle.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partitioner.adaptive.partitioning.enable = true > partitioner.availability.timeout.ms = 0 > partitioner.class = null > partitioner.ignore.keys = false > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retries = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 1 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = PLAIN > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 360 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = SASL_PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > socket.connection.setup.timeout.max.ms = 3 > socket.connection.setup.timeout.ms = 1 >
[jira] [Commented] (KAFKA-16986) After upgrading to Kafka 3.4.1, the producer constantly produces logs related to topicId changes
[ https://issues.apache.org/jira/browse/KAFKA-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862606#comment-17862606 ] Justine Olshan commented on KAFKA-16986: Hey, thanks for taking a look at it. It is very strange that the same producer would log this error more than once since it means somehow the producer got metadata with a topic ID, updated it (first instance of message) and then somehow loses it again and upon getting it again logs again. If a producer is writing to the same topic consistently, I'm not sure how it could lose the topic ID in the metadata unless some brokers have metadata containing the topic ID and others do not. > After upgrading to Kafka 3.4.1, the producer constantly produces logs related > to topicId changes > > > Key: KAFKA-16986 > URL: https://issues.apache.org/jira/browse/KAFKA-16986 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.0.1, 3.6.1 >Reporter: Vinicius Vieira dos Santos >Priority: Minor > Attachments: image-2024-07-01-09-05-17-147.png, image.png > > > When updating the Kafka broker from version 2.7.0 to 3.4.1, we noticed that > the applications began to log the message "{*}Resetting the last seen epoch > of partition PAYMENTS-0 to 0 since the associated topicId changed from null > to szRLmiAiTs8Y0nI8b3Wz1Q{*}" in a very constant, from what I understand this > behavior is not expected because the topic was not deleted and recreated so > it should simply use the cached data and not go through this client log line. > We have some applications with around 15 topics and 40 partitions which means > around 600 log lines when metadata updates occur > The main thing for me is to know if this could indicate a problem or if I can > simply change the log level of the org.apache.kafka.clients.Metadata class to > warn without worries > > There are other reports of the same behavior like this: > [https://stackoverflow.com/questions/74652231/apache-kafka-resetting-the-last-seen-epoch-of-partition-why] > > *Some log occurrences over an interval of about 7 hours, each block refers to > an instance of the application in kubernetes* > > !image.png! > *My scenario:* > *Application:* > - Java: 21 > - Client: 3.6.1, also tested on 3.0.1 and has the same behavior > *Broker:* > - Cluster running on Kubernetes with the bitnami/kafka:3.4.1-debian-11-r52 > image > > *Producer Config* > > acks = -1 > auto.include.jmx.reporter = true > batch.size = 16384 > bootstrap.servers = [server:9092] > buffer.memory = 33554432 > client.dns.lookup = use_all_dns_ips > client.id = producer-1 > compression.type = gzip > connections.max.idle.ms = 54 > delivery.timeout.ms = 3 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 0 > max.block.ms = 6 > max.in.flight.requests.per.connection = 1 > max.request.size = 1048576 > metadata.max.age.ms = 30 > metadata.max.idle.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partitioner.adaptive.partitioning.enable = true > partitioner.availability.timeout.ms = 0 > partitioner.class = null > partitioner.ignore.keys = false > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retries = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 1 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = PLAIN > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 360 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 >
Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]
bbejeck commented on PR #16503: URL: https://github.com/apache/kafka/pull/16503#issuecomment-2204539517 Previous build failures unrelated. Kicking off the build one more time to confirm test successful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17047: Refactored group coordinator classes to modern package (KIP-932) [kafka]
apoorvmittal10 commented on PR #16474: URL: https://github.com/apache/kafka/pull/16474#issuecomment-2204398771 Build passed with unrelated tests failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests [kafka]
apoorvmittal10 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1663148002 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3956,11 +3961,482 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] -// TODO: Implement the ShareFetchRequest handling -requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) + +if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return +} +val topicNames = metadataCache.topicIdsToNames() +val sharePartitionManager : SharePartitionManager = sharePartitionManagerOption match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") +} + +val groupId = shareFetchRequest.data.groupId +val clientId = request.header.clientId +val memberId = shareFetchRequest.data().memberId() +val shareSessionEpoch = shareFetchRequest.data().shareSessionEpoch() + +val shareFetchData = shareFetchRequest.shareFetchData(topicNames) +val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames) +var cachedTopicPartitions : util.List[TopicIdPartition] = null + +if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + try { +cachedTopicPartitions = sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId)) Review Comment: @AndrewJSchofield @adixitconfluent @omkreddy Can I get review on this please: https://github.com/apache/kafka/pull/16513 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16754: Removing partitions from release API (KIP-932) [kafka]
apoorvmittal10 opened a new pull request, #16513: URL: https://github.com/apache/kafka/pull/16513 The release API exposed Partitions which should be an internal implementation detail for `releaseAcquiredRecords` API. Also lessen the scope for cached topic partitons method as it's not needed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17064) New consumer assign should update assignment in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862589#comment-17862589 ] Lianet Magrans commented on KAFKA-17064: Hey [~yangpoan] , definitely, based on events that in the end will execute in the background thread. I see 2 cases: 1. If the assign has empty partitions I would say we're covered: it relies on a call to unsubscribe, and that performs all assignment updates in the background already. We just need KAFKA-17017 to get it all right when the member is not in a group, sure thing. 2. if the assign has non-empty partitions, it will end up updating the assignment in the app thread, and we should fix that (this issue). We should rely on a event indeed and we already have it actually, it's the AssignmentChangeEvent. The issue is that this event currently does only part of the job (it only triggers a commit https://github.com/apache/kafka/blob/c97d4ce026fdb75f80760bcce00b239db951a481/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L181). Misleading name indeed. We should consider having that event handle the full assignment change (commit and update the subscription state with assignFromUser), all in the background. > New consumer assign should update assignment in background thread > - > > Key: KAFKA-17064 > URL: https://issues.apache.org/jira/browse/KAFKA-17064 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > > With the new async consumer, the subscriptionState object is shared between > the app thread and the background thread, but in principle all updates to the > assignment should happen in the background thread, to avoid race conditions. > Note that it's in the background where most updates to the assignment happen, > as result of app event processing like unsubscribe, reconciliations, etc.). > We've faced such races in places like unsubscribe and close, fixed by > ensuring that all assignment updates happen in the background, and this also > needs to be reviewed for the consumer.assign. The current implementation > triggers an AssignmentChange event that is processed in the background, but > that event is not really changing the assignment. It only commits offsets, > and the assignment is updated in the app thread by calling > subscriptionState.assignFromUser > We should consider moving the assignment update to the background thread, as > part of the AssignmentChangeEvent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Master fix downstream trigger [kafka]
Muen342 closed pull request #16512: Master fix downstream trigger URL: https://github.com/apache/kafka/pull/16512 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17017) AsyncConsumer#unsubscribe does not clean the assigned partitions
[ https://issues.apache.org/jira/browse/KAFKA-17017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17862584#comment-17862584 ] Lianet Magrans commented on KAFKA-17017: Hey [~yangpoan] / [~chia7712] , sorry I missed this, was out for a few days. Fair issue, the intention was not to change the behaviour, I agree with the gap reported. Thinking about an approach, I would imagine that we could simply delegate everything on the UnsubscribeEvent, meaning that the AsyncConsumer#unsubscribe would not bother checking if in group or not (no need for point 1 above), and we would just trigger the UnsuscribeEvent to address this situation in a single place (point 2 above only): {quote}2) `MembershipManagerImpl#leaveGroup` should call `subscriptions.unsubscribe` if the `isNotInGroup` is true {quote} Not sure if I'm missing a detail, but seems like a consistent/simple approach to solve this. The membershipMgr already calls subscriptions.unsubscribe when it leave the group (on the leaveGroup()), so we would just complete the story to cover the not in group case, but all handled in the same component. > AsyncConsumer#unsubscribe does not clean the assigned partitions > > > Key: KAFKA-17017 > URL: https://issues.apache.org/jira/browse/KAFKA-17017 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > Labels: kip-848-client-support > > According to docs [0] `Consumer#unsubscribe` should clean both subscribed and > assigned partitions. However, there are two issues about `AsyncConsumer` > 1) if we don't set group id, `AsyncConsumer#unsubscribe`[1] will be no-op > 2) if we set group id, `AsyncConsumer` is always in `UNSUBSCRIBED` state and > so `MembershipManagerImpl#leaveGroup`[2] will be no-op > [0] > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L759 > [1] > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1479 > [2] > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L666 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Master fix downstream trigger [kafka]
Muen342 opened a new pull request, #16512: URL: https://github.com/apache/kafka/pull/16512 fix map to correct branches https://github.com/confluentinc/kafka/pull/1204/files#diff-e6ffa5dc854b843b3ee3c3c28f8eae2f436c2df2b1ca299cca1fa5982e390cf8L54-L61 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-17066: --- Priority: Blocker (was: Major) > New consumer updateFetchPositions should perform all operations in background > thread > > > Key: KAFKA-17066 > URL: https://issues.apache.org/jira/browse/KAFKA-17066 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Blocker > Fix For: 3.9.0 > > > The updateFetchPositions func in the new consumer performs several actions > based on the assigned partitions from the subscriptionState. The way it's > currently implemented, it fetches committed offsets for partitions that > required a position (retrieved from subscription state in the app thread), > and then resets positions for the partitions still needing one (retrieved > from the subscription state but in the backgroud thread). > This is problematic, given that the assignment/subscriptionState may change > in the background thread at any time (ex. new partitions reconciled), so we > could end up resetting positions to the partition offsets for a partition for > which we never evetn attempted to retrieve committed offsets. > This sequence for a consumer that owns a partitions tp0,: > * consumer owns tp0 > * app thread -> updateFetchPositions triggers > initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned > partitions requiring a position (taking them from > subscriptions.initializingPartitions()). This will fetch committed offsets > for tp0 only. > * > background thread -> receives new partition tp1 and completes reconciliation > (adds it to the subscription state as INITIALIZING, requires a position) > * app thread -> updateFetchPositions resets positions for all partitions > that still don't have a valid position after initWithCommittedOffsetsIfNeeded > (taking them from subscriptionState.partitionsNeedingReset). This will > mistakenly consider that it should reset tp1 to the partition offsets, when > in reality it never even tried fetching the committed offsets for it because > it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. > We should consider moving the updateFetchPositions as a single event to the > background, that would safely use the subscriptionState object and apply all > actions involved in the updateFetchPositions to the same consistent set of > partitions assigned at that moment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread
Lianet Magrans created KAFKA-17066: -- Summary: New consumer updateFetchPositions should perform all operations in background thread Key: KAFKA-17066 URL: https://issues.apache.org/jira/browse/KAFKA-17066 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.8.0 Reporter: Lianet Magrans Fix For: 3.9.0 The updateFetchPositions func in the new consumer performs several actions based on the assigned partitions from the subscriptionState. The way it's currently implemented, it fetches committed offsets for partitions that required a position (retrieved from subscription state in the app thread), and then resets positions for the partitions still needing one (retrieved from the subscription state but in the backgroud thread). This is problematic, given that the assignment/subscriptionState may change in the background thread at any time (ex. new partitions reconciled), so we could end up resetting positions to the partition offsets for a partition for which we never evetn attempted to retrieve committed offsets. This sequence for a consumer that owns a partitions tp0,: * consumer owns tp0 * app thread -> updateFetchPositions triggers initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned partitions requiring a position (taking them from subscriptions.initializingPartitions()). This will fetch committed offsets for tp0 only. * background thread -> receives new partition tp1 and completes reconciliation (adds it to the subscription state as INITIALIZING, requires a position) * app thread -> updateFetchPositions resets positions for all partitions that still don't have a valid position after initWithCommittedOffsetsIfNeeded (taking them from subscriptionState.partitionsNeedingReset). This will mistakenly consider that it should reset tp1 to the partition offsets, when in reality it never even tried fetching the committed offsets for it because it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. We should consider moving the updateFetchPositions as a single event to the background, that would safely use the subscriptionState object and apply all actions involved in the updateFetchPositions to the same consistent set of partitions assigned at that moment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17032: NioEchoServer should generate meaningful id instead of incremental number. [kafka]
chia7712 merged PR #16460: URL: https://github.com/apache/kafka/pull/16460 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17032) NioEchoServer should generate meaningful id instead of incremential number
[ https://issues.apache.org/jira/browse/KAFKA-17032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17032. Fix Version/s: 3.9.0 Resolution: Fixed > NioEchoServer should generate meaningful id instead of incremential number > -- > > Key: KAFKA-17032 > URL: https://issues.apache.org/jira/browse/KAFKA-17032 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.9.0 > > > see discussion: > https://github.com/apache/kafka/pull/16384#issuecomment-2187071751 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16806: Explicitly declare JUnit dependencies for all test modules [kafka]
chia7712 commented on code in PR #16301: URL: https://github.com/apache/kafka/pull/16301#discussion_r1663048605 ## build.gradle: ## @@ -35,6 +35,7 @@ plugins { id 'idea' id 'jacoco' id 'java-library' + id 'jvm-test-suite' Review Comment: We have introduced dependency of junit platform (https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L204), so maybe we can manually declare dependencies? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16106: revert classic state transitions if deletion fails [kafka]
jeffkbkim opened a new pull request, #16511: URL: https://github.com/apache/kafka/pull/16511 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16550) add integration test for LogDirsCommand
[ https://issues.apache.org/jira/browse/KAFKA-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16550. Fix Version/s: 3.9.0 Resolution: Fixed > add integration test for LogDirsCommand > --- > > Key: KAFKA-16550 > URL: https://issues.apache.org/jira/browse/KAFKA-16550 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.9.0 > > > Currently LogDirsCommand have only UT -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16550: add integration test for LogDirsCommand [kafka]
chia7712 merged PR #16485: URL: https://github.com/apache/kafka/pull/16485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17032: NioEchoServer should generate meaningful id instead of incremental number. [kafka]
chia7712 commented on PR #16460: URL: https://github.com/apache/kafka/pull/16460#issuecomment-2204075651 @gharris1727 any feedback? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version
[ https://issues.apache.org/jira/browse/KAFKA-17051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17051. Fix Version/s: 3.9.0 Resolution: Fixed > ApiKeys#toHtml should exclude the APIs having unstable latest version > - > > Key: KAFKA-17051 > URL: https://issues.apache.org/jira/browse/KAFKA-17051 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Minor > Fix For: 3.9.0 > > > see the discussion: > https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o > The (released) docs should show only the APIs which are ready to be exposed > publicly. Those APIs having make "latestVersionUnstable=true" should be > excluded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17051: ApiKeys#toHtml should exclude the APIs having unstable latest version [kafka]
chia7712 merged PR #16480: URL: https://github.com/apache/kafka/pull/16480 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-3346) Rename "Mode" to "SslMode"
[ https://issues.apache.org/jira/browse/KAFKA-3346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-3346. --- Fix Version/s: 3.9.0 Resolution: Fixed > Rename "Mode" to "SslMode" > -- > > Key: KAFKA-3346 > URL: https://issues.apache.org/jira/browse/KAFKA-3346 > Project: Kafka > Issue Type: Bug > Components: security >Reporter: Gwen Shapira >Assignee: Ksolves >Priority: Major > Fix For: 3.9.0 > > > In the channel builders, the Mode enum is undocumented, so it is unclear that > it is used to signify whether the connection is for SSL client or SSL server. > I suggest renaming to SslMode (although adding documentation will be ok too, > if people object to the rename) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-3346: Refactor & Rename Mode to ConnectionMode [kafka]
chia7712 commented on PR #16403: URL: https://github.com/apache/kafka/pull/16403#issuecomment-2204063329 @abhi-ksolves thanks for this contribution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-3346: Refactor & Rename Mode to ConnectionMode [kafka]
chia7712 merged PR #16403: URL: https://github.com/apache/kafka/pull/16403 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
chia7712 commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2204040094 > Thus, I don't think it will cause the error as described in the JIRA, right? I still don't understand in what circumstance this issue will happen. It might need more explanation. Thanks. thanks for this response, and it inspires me to dig-in the root cause. The issue happens when the zk broker is using generated broker id and it is in migrating. 1. if we define the broker id in config file, the broker id is viewed as invalid since it is in the reserved range (when `broker.id.generation.enable=true`) 2. if we don't define the broker id in config file, the broker id will be evaluated according to `meta.properties` [0] . However, we forgot to update `node.id` and so kraft client will use "node.id=-1" [1] to build and it results in error .. [0] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L261 [1] https://github.com/apache/kafka/blob/e55c28c60b0f021b12c93fa04e360ce7a2e0a5ac/core/src/main/scala/kafka/raft/RaftManager.scala#L230 In short, we don't consider the use case of "generated broker id (the broker id is omitted)" + "zk migration". It seems we need two fixes: 1. update docs to explain the possible error when you define the broker id for the zk broker which is using `broker.id.generation.enable` 2. in order to minimize the changes, we can update both `broker.id` and `node.id` together. The side effect is `KafkaConfig#nodeId` will be a mutable variable. @showuon WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662970452 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -3839,6 +3842,209 @@ public void close() {} assertEquals("response1", write.get(5, TimeUnit.SECONDS)); } +@Test +public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(Duration.ofMillis(20)) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.withSerializer(new StringSerializer()) +.withAppendLingerMs(10) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); +assertNull(ctx.currentBatch); + +// Get the max batch size. +int maxBatchSize = writer.config(TP).maxMessageSize(); + +// Create records with a quarter of the max batch size each. Keep in mind that +// each batch has a header so it is not possible to have those four records +// in one single batch. +List records = Stream.of('1', '2', '3', '4').map(c -> { +char[] payload = new char[maxBatchSize / 4]; +Arrays.fill(payload, c); +return new String(payload); +}).collect(Collectors.toList()); + +// Let's try to write all the records atomically (the default) to ensure +// that it fails. +CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#1") +); + +assertFutureThrows(write1, RecordTooLargeException.class); + +// Let's try to write the same records non-atomically. +CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#2", null, true, false) +); + +// The write is pending. +assertFalse(write2.isDone()); + +// Verify the state. +assertNotNull(ctx.currentBatch); +// The last written offset is 3L because one batch was written to the log with +// the first three records. The 4th one is pending. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) +), ctx.coordinator.coordinator().fullRecords()); +assertEquals(Collections.singletonList( +records(timer.time().milliseconds(), records.subList(0, 3)) +), writer.entries(TP)); + +// Commit up to 3L. +writer.commit(TP, 3L); + +// The write is still pending. +assertFalse(write2.isDone()); + +// Advance past the linger time to flush the pending batch. +timer.advanceClock(11); + +// Verify the state. +assertNull(ctx.currentBatch); +assertEquals(4L, ctx.coordinator.lastWrittenOffset()); +assertEquals(3L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
jolshan commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662961495 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -3839,6 +3842,209 @@ public void close() {} assertEquals("response1", write.get(5, TimeUnit.SECONDS)); } +@Test +public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(Duration.ofMillis(20)) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.withSerializer(new StringSerializer()) +.withAppendLingerMs(10) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); +assertNull(ctx.currentBatch); + +// Get the max batch size. +int maxBatchSize = writer.config(TP).maxMessageSize(); + +// Create records with a quarter of the max batch size each. Keep in mind that +// each batch has a header so it is not possible to have those four records +// in one single batch. +List records = Stream.of('1', '2', '3', '4').map(c -> { +char[] payload = new char[maxBatchSize / 4]; +Arrays.fill(payload, c); +return new String(payload); +}).collect(Collectors.toList()); + +// Let's try to write all the records atomically (the default) to ensure +// that it fails. +CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#1") +); + +assertFutureThrows(write1, RecordTooLargeException.class); + +// Let's try to write the same records non-atomically. +CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#2", null, true, false) +); + +// The write is pending. +assertFalse(write2.isDone()); + +// Verify the state. +assertNotNull(ctx.currentBatch); +// The last written offset is 3L because one batch was written to the log with +// the first three records. The 4th one is pending. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) +), ctx.coordinator.coordinator().fullRecords()); +assertEquals(Collections.singletonList( +records(timer.time().milliseconds(), records.subList(0, 3)) +), writer.entries(TP)); + +// Commit up to 3L. +writer.commit(TP, 3L); + +// The write is still pending. +assertFalse(write2.isDone()); + +// Advance past the linger time to flush the pending batch. +timer.advanceClock(11); + +// Verify the state. +assertNull(ctx.currentBatch); +assertEquals(4L, ctx.coordinator.lastWrittenOffset()); +assertEquals(3L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
dajac commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1662942561 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A {@code RangeSet} represents a range of integers from {@code from} (inclusive) + * to {@code to} (exclusive). + * This implementation provides a view over a continuous range of integers without actually storing them. + */ +public class RangeSet implements Set { +private final int from; +private final int to; + +/** + * Constructs a {@code RangeSet} with the specified range. + * + * @param from The starting value (inclusive) of the range. + * @param toThe ending value (exclusive) of the range. + */ +public RangeSet(int from, int to) { +this.from = from; +this.to = to; +} + +@Override +public int size() { +return to - from; +} + +@Override +public boolean isEmpty() { +return size() == 0; +} + +@Override +public boolean contains(Object o) { +if (o instanceof Integer) { +int value = (Integer) o; +return value >= from && value < to; +} +return false; +} + +@Override +public Iterator iterator() { +return new Iterator() { +private int current = from; + +@Override +public boolean hasNext() { +return current < to; +} + +@Override +public Integer next() { +if (!hasNext()) throw new NoSuchElementException(); +return current++; +} +}; +} + +@Override +public Object[] toArray() { +throw new UnsupportedOperationException(); +} + +@Override +public T[] toArray(T[] a) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean add(Integer integer) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean remove(Object o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean containsAll(Collection c) { +for (Object o : c) { +if (!contains(o)) return false; +} +return true; +} + +@Override +public boolean addAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean retainAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean removeAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public void clear() { +throw new UnsupportedOperationException(); +} + +@Override +public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = from; i < to; i++) { +sb.append(i); +if (i < to - 1) { +sb.append(", "); +} +} +sb.append("]"); +return sb.toString(); Review Comment: The short version seems better to me but it may be a personal preference. I’d also like to have the type printed out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
chia7712 commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1662903374 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) +.forEach(conf -> conf.errorMessages().add(errorMsg))); Review Comment: could you please use `addErrorMessage` instead of `errorMessages().add`? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: `EMIT_OFFSET_SYNCS_ENABLED` is in `MirrorSourceConfig`, so it is not a part of config def of `MirrorCheckpointConnector`. Hence, the error related to `EMIT_OFFSET_SYNCS_ENABLED` can't be propagated. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -234,6 +234,30 @@ public ConfigDef config() { @Override public org.apache.kafka.common.config.Config validate(Map props) { List configValues = super.validate(props).configValues(); +validateExactlyOnceConfigs(props, configValues); +validateEmitOffsetSyncConfigs(props, configValues); + +return new org.apache.kafka.common.config.Config(configValues); +} + +private static void validateEmitOffsetSyncConfigs(Map props, List configValues) { +boolean offsetSyncsConfigured = props.keySet().stream() +.anyMatch(conf -> conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX)); + +if ("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && offsetSyncsConfigured) { Review Comment: what if `EMIT_OFFSET_SYNCS_ENABLED=true` and `offsetSyncsConfigured=false`? Should we add error message for it? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> +configValues.stream() +.filter(conf -> conf.name().equals(config)) Review Comment: The following test shows my concern: ```java @Test public void test() { Map props = new HashMap<>(); props.put(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, "false"); MirrorCheckpointConnector connector = new MirrorCheckpointConnector(); Config config = connector.validate(props); assertEquals(1, config.configValues().stream().filter(c -> c.name().equals(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).count()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16934: Clean up and refactor release.py [kafka]
jlprat commented on PR #16287: URL: https://github.com/apache/kafka/pull/16287#issuecomment-2203894286 I wanted to use this script to release 3.8 now that is finally unblocked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Revert "KAFKA-16154" and mark MV 3.8-IV0 as the latest production [kafka]
jlprat commented on PR #16400: URL: https://github.com/apache/kafka/pull/16400#issuecomment-2203893011 Merged! Thanks for the reviews! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Revert "KAFKA-16154" and mark MV 3.8-IV0 as the latest production [kafka]
jlprat merged PR #16400: URL: https://github.com/apache/kafka/pull/16400 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17064) New consumer assign should update assignment in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17064: -- Component/s: clients consumer > New consumer assign should update assignment in background thread > - > > Key: KAFKA-17064 > URL: https://issues.apache.org/jira/browse/KAFKA-17064 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > > With the new async consumer, the subscriptionState object is shared between > the app thread and the background thread, but in principle all updates to the > assignment should happen in the background thread, to avoid race conditions. > Note that it's in the background where most updates to the assignment happen, > as result of app event processing like unsubscribe, reconciliations, etc.). > We've faced such races in places like unsubscribe and close, fixed by > ensuring that all assignment updates happen in the background, and this also > needs to be reviewed for the consumer.assign. The current implementation > triggers an AssignmentChange event that is processed in the background, but > that event is not really changing the assignment. It only commits offsets, > and the assignment is updated in the app thread by calling > subscriptionState.assignFromUser > We should consider moving the assignment update to the background thread, as > part of the AssignmentChangeEvent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
dajac commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1662897722 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new
Re: [PR] KAFKA-10816: Add health check endpoint for Kafka Connect [kafka]
C0urante commented on code in PR #16477: URL: https://github.com/apache/kafka/pull/16477#discussion_r1662610208 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java: ## @@ -32,15 +43,59 @@ public class RootResource { private final Herder herder; +private final RestRequestTimeout requestTimeout; +private final Time time; @Inject -public RootResource(Herder herder) { +public RootResource(Herder herder, RestRequestTimeout requestTimeout) { +this(herder, requestTimeout, Time.SYSTEM); +} + +// For testing only +RootResource(Herder herder, RestRequestTimeout requestTimeout, Time time) { this.herder = herder; +this.requestTimeout = requestTimeout; +this.time = time; } @GET -@Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to") +@Operation(summary = "Get details about this Connect worker and the ID of the Kafka cluster it is connected to") public ServerInfo serverInfo() { return new ServerInfo(herder.kafkaClusterId()); } + +@GET +@Path("/health") +@Operation(summary = "Health check endpoint to verify worker readiness and liveness") +public Response healthCheck() throws Throwable { +WorkerStatus workerStatus; +int statusCode; +try { +FutureCallback cb = new FutureCallback<>(); +herder.healthCheck(cb); + +long timeoutNs = TimeUnit.MILLISECONDS.toNanos(requestTimeout.healthCheckTimeoutMs()); +long deadlineNs = timeoutNs + time.nanoseconds(); +time.waitForFuture(cb, deadlineNs); + +statusCode = Response.Status.OK.getStatusCode(); +workerStatus = WorkerStatus.healthy(); +} catch (TimeoutException e) { +Stage stage = e instanceof StagedTimeoutException +? ((StagedTimeoutException) e).stage() +: null; +if (!herder.isReady()) { +statusCode = Response.Status.SERVICE_UNAVAILABLE.getStatusCode(); +workerStatus = WorkerStatus.starting(stage); +} else { +statusCode = Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(); +workerStatus = WorkerStatus.unhealthy(stage); +} +} catch (ExecutionException e) { +throw e.getCause(); Review Comment: The exception mapper will catch this anyways, won't it? The intent was to hit [this part](https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L73-L77) of the code path and generate the standard 500-on-unexpected-error response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on PR #16498: URL: https://github.com/apache/kafka/pull/16498#issuecomment-2203203451 @jeffkbkim @dongnuo123 @jolshan Thanks for your comments. I addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662541019 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -936,62 +941,90 @@ private void append( )); } -// Compute the estimated size of the records. -int estimatedSize = AbstractRecords.estimateSizeInBytes( -currentBatch.builder.magic(), -compression.type(), -recordsToAppend -); +if (isAtomic) { Review Comment: > To confirm my understanding, we take the original path when the append should be atomic, which means we verify whether all records can fit in the current batch. If not, we allocate a new batch then append the records. If the append does not need to be atomic, we append individual records until we can fit no more, then allocate a new batch. > > This will reduce the number of writes to the log as we will have more filled batches on average. In practice, only the unload partition and cleanup group metadata jobs will have non-atomic writes today so we should not expect much impact. > > Is this correct? Yes. This is correct. However, I did not do this change to improve the how batches are filled but rather to ensure that we would not be stuck in the case where we have a large number of records that could not fit in one batch. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -936,62 +941,90 @@ private void append( )); } -// Compute the estimated size of the records. -int estimatedSize = AbstractRecords.estimateSizeInBytes( -currentBatch.builder.magic(), -compression.type(), -recordsToAppend -); +if (isAtomic) { Review Comment: > One other thing I noticed is the ordering. For the atomic case, we create the batch first and then replay whereas non-atomic we do the replay then add to batch. Not sure if it makes a big difference though since we moved where we enqueue the event. This is a very good point and the code was wrong. I changed it to check if the record can fit in the batch before replaying. I added a test for this too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16584 Make log processing summary configurable or debug [kafka]
dujian0068 closed pull request #16428: KAFKA-16584 Make log processing summary configurable or debug URL: https://github.com/apache/kafka/pull/16428 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17050: Revert `group.version` [kafka]
dajac merged PR #16482: URL: https://github.com/apache/kafka/pull/16482 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662545695 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -936,62 +941,90 @@ private void append( )); } -// Compute the estimated size of the records. -int estimatedSize = AbstractRecords.estimateSizeInBytes( -currentBatch.builder.magic(), -compression.type(), -recordsToAppend -); +if (isAtomic) { +// Compute the estimated size of the records. +int estimatedSize = AbstractRecords.estimateSizeInBytes( +currentBatch.builder.magic(), +compression.type(), +recordsToAppend +); -// Check if the current batch has enough space. We check is before -// replaying the records in order to avoid having to revert back -// changes if the records do not fit within a batch. -if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { -throw new RecordTooLargeException("Message batch size is " + estimatedSize + -" bytes in append to partition " + tp + " which exceeds the maximum " + -"configured size of " + currentBatch.maxBatchSize + "."); -} +// Check if the current batch has enough space. We check is before +// replaying the records in order to avoid having to revert back +// changes if the records do not fit within a batch. +if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { +throw new RecordTooLargeException("Message batch size is " + estimatedSize + +" bytes in append to partition " + tp + " which exceeds the maximum " + +"configured size of " + currentBatch.maxBatchSize + "."); +} -if (!currentBatch.builder.hasRoomFor(estimatedSize)) { -// Otherwise, we write the current batch, allocate a new one and re-verify -// whether the records fit in it. -// If flushing fails, we don't catch the exception in order to let -// the caller fail the current operation. -flushCurrentBatch(); -maybeAllocateNewBatch( -producerId, -producerEpoch, -verificationGuard, -currentTimeMs -); +if (!currentBatch.builder.hasRoomFor(estimatedSize)) { +// Otherwise, we write the current batch, allocate a new one and re-verify +// whether the records fit in it. +// If flushing fails, we don't catch the exception in order to let +// the caller fail the current operation. +flushCurrentBatch(); +maybeAllocateNewBatch( +producerId, +producerEpoch, +verificationGuard, +currentTimeMs +); +} } -// Add the event to the list of pending events associated with the batch. -currentBatch.deferredEvents.add(event); - try { -// Apply record to the state machine. -if (replay) { -for (int i = 0; i < records.size(); i++) { -// We compute the offset of the record based on the last written offset. The -// coordinator is the single writer to the underlying partition so we can -// deduce it like this. +for (int i = 0; i < records.size(); i++) { +U recordToReplay = records.get(i); +SimpleRecord recordToAppend = recordsToAppend.get(i); + +if (replay) { coordinator.replay( -currentBatch.nextOffset + i, +currentBatch.nextOffset, producerId, producerEpoch, -records.get(i) +recordToReplay ); } -} -// Append to the batch. -for (SimpleRecord record :
Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]
cadonna commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1660761368 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/FailedProcessingException.java: ## Review Comment: What do you think of moving this exception to `streams.errors.internals`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Revert "KAFKA-16154" and mark MV 3.8-IV0 as the latest production [kafka]
jlprat commented on PR #16400: URL: https://github.com/apache/kafka/pull/16400#issuecomment-2203252464 Failing tests shouldn't be related to the changes (all tests passed locally) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
chia7712 commented on code in PR #16491: URL: https://github.com/apache/kafka/pull/16491#discussion_r1661073537 ## docs/ops.html: ## @@ -3971,7 +3971,10 @@ Enter Migration Mode on the Brokers # KRaft controller quorum configuration controller.quorum.voters=3000@localhost:9093 -controller.listener.names=CONTROLLER +controller.listener.names=CONTROLLER + +# If the ZK broker is using a generated broker ID, disable broker ID generation +broker.id.generation.enable=false Review Comment: @showuon that is good question. maybe we can describe two solutions for users: 1. `broker.id.generation.enable=false`. in migration, all brokers should have unique id already so it should be fine. 2. increase `reserved.broker.max.id`. users need to know the max node id in the cluster -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
yazgoo commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1661056552 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -131,6 +131,27 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } +@Test +public void checkIndividualConnectorBalance() { +connectors.clear(); +addNewConnector("connector1", 12); +performStandardRebalance(); +addNewConnector("connector2", 12); +performStandardRebalance(); +addNewEmptyWorkers("worker2"); +performStandardRebalance(); +performStandardRebalance(); +addNewEmptyWorkers("worker3"); +performStandardRebalance(); +performStandardRebalance(); +assertEquals(3, memberAssignments.size()); +memberAssignments.forEach((k, v) -> { Review Comment: I refactored everything in `BalancedIterator` and added it to assignTasks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]
bbejeck commented on code in PR #16503: URL: https://github.com/apache/kafka/pull/16503#discussion_r1662488787 ## streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java: ## Review Comment: Agreed, I'm removing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16584 Make log processing summary configurable or debug [kafka]
dujian0068 opened a new pull request, #16509: URL: https://github.com/apache/kafka/pull/16509 [KAFKA-16584 Make log processing summary configurable or debug](https://issues.apache.org/jira/browse/KAFKA-16584) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]
lucasbru commented on PR #16503: URL: https://github.com/apache/kafka/pull/16503#issuecomment-2203138366 @bbejeck yes, aggred. This one is good to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest [kafka]
bbejeck commented on PR #16503: URL: https://github.com/apache/kafka/pull/16503#issuecomment-2203129697 > I wonder if we should set the config for all our integration tests to reduce the initial delay? @lucasbru - good idea, I'd prefer to do it in a follow up PR - WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]
chia7712 commented on code in PR #16425: URL: https://github.com/apache/kafka/pull/16425#discussion_r1661082490 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/SingleWriteMultiReadBenchmark.java: ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.apache.kafka.server.immutable.ImmutableMap; +import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@State(Scope.Group) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class SingleWriteMultiReadBenchmark { +private static final int TIMES = 100_000; + +@Param({"100"}) +private int mapSize; + +@Param({"0.1"}) +private double writePercentage; + +private Map concurrentHashMap; +private Map copyOnWriteMap; + +private int writeTimes; +private ImmutableMap pcollectionsImmutableMap; Review Comment: it needs `volatile`, right? ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/SingleWriteMultiReadBenchmark.java: ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.apache.kafka.server.immutable.ImmutableMap; +import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
frankvicky commented on code in PR #16491: URL: https://github.com/apache/kafka/pull/16491#discussion_r1661081534 ## docs/ops.html: ## @@ -3971,7 +3971,10 @@ Enter Migration Mode on the Brokers # KRaft controller quorum configuration controller.quorum.voters=3000@localhost:9093 -controller.listener.names=CONTROLLER +controller.listener.names=CONTROLLER + +# If the ZK broker is using a generated broker ID, disable broker ID generation +broker.id.generation.enable=false Review Comment: >If we don't add this line: broker.id.generation.enable=false, will it always fail? If `broker.id.generation.enable=true` and the user does not set `reserved.broker.max.id` to a value greater than -1, it will definitely fail; conversely, `broker.id.generation.enable=false` will make `reserved.broker.max.id` ineffective, so it will definitely not fail. >If the user needs to enable broker.id.generation.enable in their cluster because it has been working for years, what could they do? It is recommended that the user adjusts `reserved.broker.max.id` to a sufficiently large value to facilitate future migrations. ## docs/ops.html: ## @@ -3971,7 +3971,10 @@ Enter Migration Mode on the Brokers # KRaft controller quorum configuration controller.quorum.voters=3000@localhost:9093 -controller.listener.names=CONTROLLER +controller.listener.names=CONTROLLER + +# If the ZK broker is using a generated broker ID, disable broker ID generation +broker.id.generation.enable=false Review Comment: oh, as @chia7712 said, maybe the first question won't be a concern ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
DL1231 commented on code in PR #15067: URL: https://github.com/apache/kafka/pull/15067#discussion_r1661004357 ## core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala: ## @@ -69,6 +70,13 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu } } + private def validateGroupName( +name: String): Unit = { Review Comment: Done ## core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala: ## @@ -114,6 +122,22 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu val properties = new Properties() config.forEach((key, value) => properties.setProperty(key, value)) ClientMetricsConfigs.validate(resource.name(), properties) + case GROUP => +validateGroupName(resource.name()) +val properties = new Properties() +val nullGroupConfigs = new mutable.ArrayBuffer[String]() +config.entrySet().forEach(e => { Review Comment: Done ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -615,7 +615,6 @@ class KafkaServer( ConfigType.USER -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers), ConfigType.IP -> new IpConfigHandler(socketServer.connectionQuotas)) - Review Comment: Done ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -479,7 +479,6 @@ class ZkAdminManager(val config: KafkaConfig, resource -> ApiError.NONE } - Review Comment: Done ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -524,7 +523,6 @@ class ZkAdminManager(val config: KafkaConfig, val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys) alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) - Review Comment: Done ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.errors.InvalidConfigurationException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; + +/** + * Group configuration related parameters and supporting methods like validation, etc. are + * defined in this class. + */ +public class GroupConfig extends AbstractConfig { + +public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = "consumer.session.timeout.ms"; + +public static final String CONSUMER_SESSION_TIMEOUT_MS_DOC += "The timeout to detect client failures when using the consumer group protocol."; + +public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = "consumer.heartbeat.interval.ms"; + +public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_DOC += "The heartbeat interval given to the members of a consumer group."; + +public static final int DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45 * 1000; + +public static final int DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5 * 1000; Review Comment: Done ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership.
Re: [PR] KAFKA-17042: The migration docs should remind users to set "broker.id.generation.enable" when adding broker.id [kafka]
frankvicky commented on PR #16491: URL: https://github.com/apache/kafka/pull/16491#issuecomment-2200234845 Hi @showuon and @chia7712 I have add some description for `broker.id.generation.enable=false`, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]
chia7712 commented on PR #16425: URL: https://github.com/apache/kafka/pull/16425#issuecomment-2200207294 @TaiJuWu could you please add summary to description ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
OmniaGM commented on PR #15968: URL: https://github.com/apache/kafka/pull/15968#issuecomment-2200102266 maybe @AndrewJSchofield or @artemlivshits can have a look into this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
DL1231 commented on PR #15067: URL: https://github.com/apache/kafka/pull/15067#issuecomment-2200071353 @dajac I've updated the PR. Please take a look again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1661035915 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -285,6 +362,49 @@ Found problem: "Expected the default metadata.version to be 3.3-IV2") } + @Test + def testStandaloneModeWithArguments(): Unit = { +val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAT", +"-s")) +val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, null))) +val exitCode = StorageTool.runFormatCommand(namespace, config) +val tempDirs = config.logDirs +tempDirs.foreach(tempDir => { + val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME + val checkpointFilePath = Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID) + assertTrue(checkpointFilePath.toFile.exists) + assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost")) + Utils.delete(new File(tempDir)) +}) +assertEquals(0, exitCode) + } + +// @Test TODO +// def testControllerQuorumVotersWithArguments(): Unit = { +//val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yQQjhMQB6JAT", +// "--controller-quorum-voters", "1@localhost:9092")) +//val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, null))) Review Comment: Not sure how to set controller listener -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17016: Align the behavior of GaugeWrapper and MeterWrapper [kafka]
FrankYang0529 commented on code in PR #16426: URL: https://github.com/apache/kafka/pull/16426#discussion_r1661024169 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -284,26 +284,34 @@ class BrokerTopicMetrics(name: Option[String], remoteStorageEnabled: Boolean = f } case class GaugeWrapper(metricType: String) { -@volatile private var gaugeObject: Gauge[Long] = _ -final private val gaugeLock = new Object -final val aggregatedMetric = new AggregatedMetric() +private final val removed = new AtomicBoolean(false) Review Comment: Updated it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17051: ApiKeys#toHtml should exclude the APIs having unstable latest version [kafka]
m1a2st commented on code in PR #16480: URL: https://github.com/apache/kafka/pull/16480#discussion_r1661004622 ## clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java: ## @@ -87,4 +88,15 @@ public void testApiScope() { "Found some APIs missing scope definition"); } +@Test +public void testHtmlOnlyHaveStableApi() { +String html = ApiKeys.toHtml(); +for (ApiKeys apiKeys : ApiKeys.clientApis()) { +if (apiKeys.messageType.latestVersionUnstable()) { +assertFalse(html.contains(apiKeys.name), "Html should not contain unstable api: " + apiKeys.name); Review Comment: Thanks for your suggestion, I updated the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on PR #16325: URL: https://github.com/apache/kafka/pull/16325#issuecomment-2200108407 @jsancio , updated with controller option ``` ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -q 1@localhost:9093 metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY}) Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0. Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -s metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY}) Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0. Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -q 1@localhost:9093 -s Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16791: Add thread detection to ClusterTestExtensions [kafka]
chia7712 commented on PR #16499: URL: https://github.com/apache/kafka/pull/16499#issuecomment-2200088200 Could you please have a individual class to implement the thread detection? that can have following benefits: 1. easy to write test for thread detection (yes, the test needs test) 2. we can apply it to other tests which don't use new test infra -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
C0urante commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1659130813 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } +@Override +public Config validate(Map connectorConfigs) { +List configValues = super.validate(connectorConfigs).configValues(); +new MirrorCheckpointConfig(connectorConfigs).validate().forEach(invalidConfig -> +configValues.stream() +.filter(conf -> conf.name().equals(invalidConfig.name())) +.forEach(conf -> invalidConfig.errorMessages().forEach(msg -> conf.addErrorMessage(msg; Review Comment: Holy Java 8 Batman! We don't need `forEach` here, can simplify a bit: ```suggestion .forEach(conf -> conf.errorMessages().addAll(invalidConfig.errorMessages(; ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ## @@ -166,6 +173,29 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } +public List validate() { +List invalidConfigs = new ArrayList<>(); + +if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && !this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) { +Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new ConfigValue(EMIT_CHECKPOINTS_ENABLED)) +.forEach(configValue -> { +configValue.addErrorMessage("MirrorCheckpointConnector can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " + +EMIT_CHECKPOINTS_ENABLED + " set to false"); +invalidConfigs.add(configValue); +}); +} + +if ("false".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))) { Review Comment: Why is `getBoolean` used above, but the `EMIT_OFFSET_SYNCS_ENABLED` property is manually read and parsed here? ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -319,11 +397,14 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes cluster.produce(topic, Integer.toString(i)); } } - private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget) throws InterruptedException { +awaitMirrorMakerStart(mm, sourceAndTarget, CONNECTOR_CLASSES); +} + +private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget, final List> connectorClasses) throws InterruptedException { Review Comment: If we want to be fancy, we can use varargs here: ```suggestion private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget, final >... connectorClasses) throws InterruptedException { ``` ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ## @@ -59,12 +59,13 @@ public void testMirrorCheckpointConnectorDisabled() { Set knownConsumerGroups = new HashSet<>(); knownConsumerGroups.add(CONSUMER_GROUP); +assertMirrorCheckpointConnectorDisabled(new MirrorCheckpointConnector(knownConsumerGroups, config)); +} + +private void assertMirrorCheckpointConnectorDisabled(MirrorCheckpointConnector connector) { Review Comment: Is this change still necessary? Looks like it might have been left over from a previous draft? ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -187,6 +194,77 @@ public void testSingleNodeCluster() throws Exception { } } +@Test +public void testClusterWithEmitOffsetDisabled() throws Exception { +Properties brokerProps = new Properties(); +EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps); +EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps); + +try (Admin adminB = clusterB.createAdminClient()) { + +// Cluster aliases +final String a = "A"; +final String b = "B"; +final String ab = a + "->" + b; +final String ba = b + "->" + a; Review Comment: Currently unused. We can either remove it, or if we want to be more explicit in our properties setup, we can use it to explicitly disable the b->a flow by setting `ba + ".enabled"` to false. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ## @@ -166,6 +173,29 @@ Duration consumerPollTimeout() {
[PR] (WIP DO NOT MERGE) KAFKA-16502: Fix flaky EOSUncleanShutdownIntegrationTest [kafka]
mjsax opened a new pull request, #16490: URL: https://github.com/apache/kafka/pull/16490 I could not find a root cause, but the test times out waiting KS to go into ERROR state inside the finally block. So it seem we might swallow some error? Trying to add some more logging. The logs I have access too, do not contain any errors... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org