Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1405771339 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -254,8 +255,9 @@ private void closeInternal(final Duration timeout) { void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); -maybeAutoCommitAndLeaveGroup(timer); +maybeAutocommitOnClose(timer); Review Comment: After reviewing the ConsumerCoordinator, I split the original method into two functions: We should try to send an autocommit first and leave group last. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15658) Zookeeper.jar | CVE-2023-44981
[ https://issues.apache.org/jira/browse/KAFKA-15658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789949#comment-17789949 ] David Dufour commented on KAFKA-15658: -- Is there any plan to fix 3.5.x as well? > Zookeeper.jar | CVE-2023-44981 > --- > > Key: KAFKA-15658 > URL: https://issues.apache.org/jira/browse/KAFKA-15658 > Project: Kafka > Issue Type: Bug >Reporter: masood >Priority: Critical > Fix For: 3.7.0, 3.6.1 > > > The > [CVE-2023-44981|https://www.mend.io/vulnerability-database/CVE-2023-44981] > vulnerability has been reported in the zookeeper.jar. > It's worth noting that the latest version of Kafka has a dependency on > version 3.8.2 of Zookeeper, which is also impacted by this vulnerability. > [https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2|https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2.] > could you please verify its impact on the Kafka. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405739030 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -691,6 +735,91 @@ public void testDeleteMember() { assertEquals(expectedAssignment, result.targetAssignment()); } +@Test +public void testStaticMemberReplace() { +TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( +"my-group", +20 +); + +Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); +Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + +context.addGroupMember("member-1", "member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2), +mkTopicAssignment(barTopicId, 1, 2) +)); + +context.addGroupMember("member-2", "member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 3, 4), +mkTopicAssignment(barTopicId, 3, 4) +)); + +context.addGroupMember("member-3", "member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 5, 6), +mkTopicAssignment(barTopicId, 5, 6) +)); + +context.updateMemberSubscription("member-1", Arrays.asList("foo", "bar", "zar"), Optional.of("member-1"), Optional.empty()); +context.updateMemberSubscription("member-2", Arrays.asList("foo", "bar", "zar"), Optional.of("member-2"), Optional.empty()); +context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar"), Optional.of("member-3"), Optional.empty()); + +// Static member 3 leaves +context.removeMemberSubscription("member-3", "member-3"); + +// Another static member joins with the same instance id as the departed one +context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", "bar", "zar"), new HashMap<>()); Review Comment: Ok.. I am slightly confused by this comment. The new member is being added using `addGroupMember` which internally invokes `withMembers`. I had an unwanted call to `updateMemberSubscription` which I have removed. Probably I am missing something here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee opened a new pull request, #14842: URL: https://github.com/apache/kafka/pull/14842 A few bugs was created from the previous issues. These are: 1. During testing or some edge cases, the coordinator request manager might hold on to an inflight request forever. Therefore, when invoking coordinatorRequestManager.poll(), nothing would return. Here we explicitly create a FindCoordinatorRequest regardless of the current request state because we want to actively search for a coordinator 2. ensureCoordinatorReady() might be stuck in an infinite loop forever if the client fail to do so. Even the consumer would be able to shutdown eventually, this is undesirable. 3. The current asyncConsumerTest mixes background/network thread shutdown with the consumer shutdown. As the goal of the module is unit testing, we should try to test the shutdown procedure separately. Therefore, this PR adds a Mockito.doAnswer call to the applicationEventHandler.close(). Tests that are testing shutdown are calling shutdown() explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405739030 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -691,6 +735,91 @@ public void testDeleteMember() { assertEquals(expectedAssignment, result.targetAssignment()); } +@Test +public void testStaticMemberReplace() { +TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( +"my-group", +20 +); + +Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); +Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + +context.addGroupMember("member-1", "member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2), +mkTopicAssignment(barTopicId, 1, 2) +)); + +context.addGroupMember("member-2", "member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 3, 4), +mkTopicAssignment(barTopicId, 3, 4) +)); + +context.addGroupMember("member-3", "member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 5, 6), +mkTopicAssignment(barTopicId, 5, 6) +)); + +context.updateMemberSubscription("member-1", Arrays.asList("foo", "bar", "zar"), Optional.of("member-1"), Optional.empty()); +context.updateMemberSubscription("member-2", Arrays.asList("foo", "bar", "zar"), Optional.of("member-2"), Optional.empty()); +context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar"), Optional.of("member-3"), Optional.empty()); + +// Static member 3 leaves +context.removeMemberSubscription("member-3", "member-3"); + +// Another static member joins with the same instance id as the departed one +context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", "bar", "zar"), new HashMap<>()); Review Comment: Ok.. I am slightly confused by this comment. The new member is being added using `addGroupMember` which internally invokes `withMembers`. I had an unwanted call to `updateMemberSubscription` which I have removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on PR #14432: URL: https://github.com/apache/kafka/pull/14432#issuecomment-1827266684 @dajac , thank you for another round of review. I have handled all review comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405738152 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -691,6 +735,91 @@ public void testDeleteMember() { assertEquals(expectedAssignment, result.targetAssignment()); } +@Test +public void testStaticMemberReplace() { +TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( +"my-group", +20 +); + +Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); +Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + +context.addGroupMember("member-1", "member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2), +mkTopicAssignment(barTopicId, 1, 2) +)); + +context.addGroupMember("member-2", "member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 3, 4), +mkTopicAssignment(barTopicId, 3, 4) +)); + +context.addGroupMember("member-3", "member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment( +mkTopicAssignment(fooTopicId, 5, 6), +mkTopicAssignment(barTopicId, 5, 6) +)); + +context.updateMemberSubscription("member-1", Arrays.asList("foo", "bar", "zar"), Optional.of("member-1"), Optional.empty()); +context.updateMemberSubscription("member-2", Arrays.asList("foo", "bar", "zar"), Optional.of("member-2"), Optional.empty()); +context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar"), Optional.of("member-3"), Optional.empty()); Review Comment: I see. Thanks for the explanation, I hadn't understood the usage of these methods correctly. I have removed these unwanted calls to `updateMemberSubscription` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405737404 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -849,21 +922,53 @@ private CoordinatorResult consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); -final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); -throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - -if (memberEpoch == 0) { -log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); +if (createIfNotExists) { +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +} +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} else { +member = group.staticMember(instanceId); +if (memberEpoch == 0) { +// A new static member joins or the existing static member rejoins. +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group", groupId, memberId, instanceId); +} else { +// Static member rejoins with a different member id so it should replace +// the previous instance iff the previous member had sent a Leave group. +throwIfInstanceIdIsUnreleased(groupId, memberId, instanceId, member); +// Replace the current member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); Review Comment: Thank you for the confirmation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405737202 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -223,20 +238,30 @@ public TargetAssignmentResult build() throws PartitionAssignorException { Map memberSpecs = new HashMap<>(); // Prepare the member spec for all members. -members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( -member, -targetAssignment.getOrDefault(memberId, Assignment.EMPTY), -subscriptionMetadata -))); +members.forEach((memberId, member) -> { +memberSpecs.put(memberId, createAssignmentMemberSpec( +member, +targetAssignment.getOrDefault(memberId, Assignment.EMPTY), +subscriptionMetadata +)); +}); // Update the member spec if updated or deleted members. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { if (updatedMemberOrNull == null) { memberSpecs.remove(memberId); } else { +ConsumerGroupMember member = members.get(memberId); +Assignment assignment; +// A new static member joins and needs to replace an existing departed one. +if (member == null && staticMembers.containsKey(updatedMemberOrNull.instanceId())) { +assignment = targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()), Assignment.EMPTY); Review Comment: Yes, your understanding is correct. I see what you are saying about how this won't work when a new static member joins. I have updated the group to expose the current set of static members in the group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405733434 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1004,27 +1118,102 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { +if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { +// The new member can't join. Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732647 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1004,27 +1118,102 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +private void throwIfInstanceIdIsUnreleased(String groupId, String memberId, String instanceId, ConsumerGroupMember member) { Review Comment: Done, moved the methods next to `throwIfMemberEpochIsInvalid`, added Javadocs and updated the argument names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732004 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -766,6 +834,11 @@ private void throwIfMemberEpochIsInvalid( int receivedMemberEpoch, List ownedTopicPartitions ) { +// If a static member rejoins, it's previous epoch would be -2. In such a +// case, we don't need to fence the member. +if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && receivedMemberEpoch == 0) { +return; +} Review Comment: No it is not. Removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732141 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -849,21 +922,53 @@ private CoordinatorResult consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); -final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); -throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - -if (memberEpoch == 0) { -log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); +if (createIfNotExists) { +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +} +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} else { +member = group.staticMember(instanceId); +if (memberEpoch == 0) { +// A new static member joins or the existing static member rejoins. +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group", groupId, memberId, instanceId); +} else { +// Static member rejoins with a different member id so it should replace +// the previous instance iff the previous member had sent a Leave group. +throwIfInstanceIdIsUnreleased(groupId, memberId, instanceId, member); +// Replace the current member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); Review Comment: Added a log line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15902) Topic partitions cannot be automatically cleaned up, leading to disk space occupation
Gao Fei created KAFKA-15902: --- Summary: Topic partitions cannot be automatically cleaned up, leading to disk space occupation Key: KAFKA-15902 URL: https://issues.apache.org/jira/browse/KAFKA-15902 Project: Kafka Issue Type: Bug Affects Versions: 2.4.0 Reporter: Gao Fei we are unable to determine the cause of this situation, but the error logs from the faulty node process keep showing the following error: {code:java} ERROR Uncaught exception in scheduled task 'kafka-log-retention' (kafka.utils.KafkaScheduler) java.nio.BufferOverflowException at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674) at java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882) at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506) at kafka.log.Log.$anonfun$roll$8(Log.scala:2066) at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066) at scala.Option.foreach(Option.scala:437) at kafka.log.Log.$anonfun$roll$2(Log.scala:2066) at kafka.log.Log.roll(Log.scala:2482) at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1859) at kafka.log.Log.deleteSegments(Log.scala:2482) at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1847) at kafka.log.Log.deleteOldSegments(Log.scala:1916) at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:1092) at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:1089) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.log.LogManager.cleanupLogs(LogManager.scala:1089) at kafka.log.LogManager.$anonfun$startupWithConfigOverrides$2(LogManager.scala:429) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Correcting Javadoc for ConnectAssertions [kafka]
yashmayya commented on code in PR #14827: URL: https://github.com/apache/kafka/pull/14827#discussion_r1405642306 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java: ## @@ -524,7 +524,7 @@ public void assertConnectorIsStopped(String connectorName, String detailMessage) * @param connectorState * @param numTasks the expected number of tasks * @param tasksState - * @return true if the connector and tasks are in RUNNING state; false otherwise + * @return true if the connector and tasks are in expected states; false otherwise Review Comment: ```suggestion * @return true if the connector and tasks are in the expected state; false otherwise ``` nit: they're all expected to be in the same state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15857) Introduce LocalLogStartOffset and TieredOffset in OffsetSpec.
[ https://issues.apache.org/jira/browse/KAFKA-15857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15857: --- Fix Version/s: 3.7.0 > Introduce LocalLogStartOffset and TieredOffset in OffsetSpec. > - > > Key: KAFKA-15857 > URL: https://issues.apache.org/jira/browse/KAFKA-15857 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: Christo Lolov >Priority: Major > Labels: need-kip, tiered-storage > Fix For: 3.7.0 > > > Introduce EarliestLocalOffset and TieredOffset in OffsetSpec which will help > in finding respective offsets while using AdminClient#listOffsets(). > EarliestLocalOffset - local log start offset of a topic partition. > TieredOffset - Highest offset up to which the segments were copied to remote > storage. > We can discuss further on naming and semantics of these offset specs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
[ https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789901#comment-17789901 ] A. Sophie Blee-Goldman commented on KAFKA-15798: cc [~mjsax] [~wcarlson5] – maybe some motivation to remove, or at least block, the named topologies feature in 3.7 > Flaky Test > NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() > - > > Key: KAFKA-15798 > URL: https://issues.apache.org/jira/browse/KAFKA-15798 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I saw a few examples recently. 2 have the same error, but the third is > different > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/] > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > > The failure is like > {code:java} > java.lang.AssertionError: Did not receive all 5 records from topic > output-stream-1 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <5> but: <0> was less than <5>{code} > The other failure was > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > {code:java} > java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
[ https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789898#comment-17789898 ] A. Sophie Blee-Goldman commented on KAFKA-15798: Took a quick look at this in the name of running down some of the worst flaky tests in Streams. I think it's pretty clear that this is failing because of the state updater thread (see below), but it's not as clear to me whether this hints at a real bug with the state updater thread or whether it only broke the named topologies feature. If it's the latter, we should probably block people from using named topologies when the state updater thread is enabled in 3.7. Although I'm actually leaning towards going a step further and just taking out the named topologies altogether – we can just remove the "public" API classes for now, as extracting all the internal logic is somewhat of a bigger project that we shouldn't rush. Of course, this is all assuming there is something about the state updater that broke named topologies – someone more familiar with the state updater should definitely verify that this isn't a real bug in normal Streams first! cc [~cadonna] [~lucasb] Oh, and this is how I know the state updater thread is responsible: if you look at [the graph of failure rates for this test|https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.streams.integration.NamedTopologyIntegrationTest=FLAKY=shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()], you'll see it goes from literally zero flakiness to the 2nd most commonly failing test in all of Streams on Oct 4th. This is the day we turned on the state updater thread by default. (It's also a bit concerning that we didn't catch this sooner. The uptick in failure rate of this test is actually quite sudden. Would be great if we could somehow manually alert on this sort of thing) > Flaky Test > NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() > - > > Key: KAFKA-15798 > URL: https://issues.apache.org/jira/browse/KAFKA-15798 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I saw a few examples recently. 2 have the same error, but the third is > different > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/] > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > > The failure is like > {code:java} > java.lang.AssertionError: Did not receive all 5 records from topic > output-stream-1 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <5> but: <0> was less than <5>{code} > The other failure was > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > {code:java} > java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827083641 @clolov: The queries that you had in your earlier comments were addressed by @kamalcph, please take a look. I plan to merge these changes if you have no further comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
satishd merged PR #14787: URL: https://github.com/apache/kafka/pull/14787 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
satishd commented on PR #14787: URL: https://github.com/apache/kafka/pull/14787#issuecomment-1827078547 Test failures in jenkins jobs are unrelated to this change, merging it to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9693: Kafka latency spikes caused by log segment flush on roll [kafka]
github-actions[bot] commented on PR #13782: URL: https://github.com/apache/kafka/pull/13782#issuecomment-1827074604 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]
github-actions[bot] commented on PR #14293: URL: https://github.com/apache/kafka/pull/14293#issuecomment-1827074429 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add retry for CI [kafka]
ex172000 commented on PR #14828: URL: https://github.com/apache/kafka/pull/14828#issuecomment-1827030588 > Thanks for the PR. The Jenkins file already sets those settings to retry failed tests in cradle. See here: > > https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/Jenkinsfile#L32 > > . > I think that the confusion comes from the fact that the retired tests are still reported as failures. I am looking into improving this here: #14743. Thank you @dajac for the explaination. That makes sense. Do you mind review #14829, looks like they are really flaky -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
pprovenzano commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1405538961 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -121,8 +122,38 @@ public static Map createAssignmentMap(int[] replicas, Uuid[] dire * Create an array with the specified number of entries set to {@link #UNASSIGNED}. */ public static Uuid[] unassignedArray(int length) { +return array(length, UNASSIGNED); +} + +/** + * Create an array with the specified number of entries set to {@link #MIGRATING}. + */ +public static Uuid[] migratingArray(int length) { +return array(length, MIGRATING); +} + +/** + * Create an array with the specified number of entries set to the specified value. + */ +private static Uuid[] array(int length, Uuid value) { Uuid[] array = new Uuid[length]; -Arrays.fill(array, UNASSIGNED); +Arrays.fill(array, value); return array; } + +/** + * Check if a directory is online, given a sorted list of online directories. + * @param dir The directory to check + * @param sortedOnlineDirs The sorted list of online directories + * @return true if the directory is considered online, false otherwise + */ +public static boolean isOnline(Uuid dir, List sortedOnlineDirs) { +if (UNASSIGNED.equals(dir) || MIGRATING.equals(dir)) { Review Comment: We should only allow for a directory ID to be MIGRATING if we are in migration mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address if multiple ad… [kafka]
ijuma commented on PR #14813: URL: https://github.com/apache/kafka/pull/14813#issuecomment-1826881644 Thanks for the PR. Let's cherry pick this to 3.6 after this is merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]
riedelmax commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1405422751 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -445,6 +446,42 @@ public List listGroups(List statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + +public List consumerGroupDescribe( +List groupIds, +long committedOffset +) { +List response = new ArrayList<>(); + +for (String groupId: groupIds) { +Group group = groups.get(groupId, committedOffset); Review Comment: Good idea. I added a `consumerGroup` method accordingly and made the structure more consistent with `describeGroups`. Is it correct for consumer groups to consider the group "Dead" when we can't find a group with the groupId? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]
riedelmax commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1405412781 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -545,6 +547,32 @@ public String currentAssignmentSummary() { ')'; } +public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(Assignment targetAssignment) { Review Comment: It would be possible of course.. but we would have redundant code to map from a Map of partitions to a List of TopicPartitions in ConsumerGroupMember.java and ConsumerGroup.java I think its better the way it is, pass the targetAssignment to ConsumerGroupMember.java and use the private static method topicPartitionsFromMap let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15901) Implement client changes to support telemetry APIs
Apoorv Mittal created KAFKA-15901: - Summary: Implement client changes to support telemetry APIs Key: KAFKA-15901 URL: https://issues.apache.org/jira/browse/KAFKA-15901 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal Implement changes in Java client to support telemetry APIs. The support should be added in Producer, Consumer and Admin client to configure telemetry reporter and support APIs. Corresponding changes in NetworkClient to allow send of telemetry calls from Java client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15894: Fix KafkaApis.updateRecordConversionStats to ensure MessageConversionsTimeMs and TemporaryMemoryBytes are recorded correctly [kafka]
runom opened a new pull request, #14841: URL: https://github.com/apache/kafka/pull/14841 `KafkaApis.updateRecordConversionStats` may be called multiple times for a request, and `request.messageConversionsTimeNanos` and `request.temporaryMemoryBytes` are overwritten in this method. ```scala def processingStatsCallback(processingStats: FetchResponseStats): Unit = { processingStats.forKeyValue { (tp, info) => updateRecordConversionStats(request, tp, info) } } ``` ```scala private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordValidationStats): Unit = { ... request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos } request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes } ``` So, in that case, MessageConversionsTimeMs and TemporaryMemoryBytes are not recorded correctly. This Pull Request has fixed the code to add up each value instead of overwriting. To test this behavior, this PR also added a test that sends a ProduceRequest containing multiple partitions and calls `updateRecordConversionStats` through `processingStatsCallback`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org