Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-26 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1405771339


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -254,8 +255,9 @@ private void closeInternal(final Duration timeout) {
 void cleanup() {
 log.trace("Closing the consumer network thread");
 Timer timer = time.timer(closeTimeout);
-maybeAutoCommitAndLeaveGroup(timer);
+maybeAutocommitOnClose(timer);

Review Comment:
   After reviewing the ConsumerCoordinator, I split the original method into 
two functions: We should try to send an autocommit first and leave group last.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15658) Zookeeper.jar | CVE-2023-44981

2023-11-26 Thread David Dufour (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789949#comment-17789949
 ] 

David Dufour commented on KAFKA-15658:
--

Is there any plan to fix 3.5.x as well?

> Zookeeper.jar | CVE-2023-44981 
> ---
>
> Key: KAFKA-15658
> URL: https://issues.apache.org/jira/browse/KAFKA-15658
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Critical
> Fix For: 3.7.0, 3.6.1
>
>
> The 
> [CVE-2023-44981|https://www.mend.io/vulnerability-database/CVE-2023-44981]  
> vulnerability has been reported in the zookeeper.jar. 
> It's worth noting that the latest version of Kafka has a dependency on 
> version 3.8.2 of Zookeeper, which is also impacted by this vulnerability. 
> [https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2|https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.2.]
> could you please verify its impact on the Kafka.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405739030


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -691,6 +735,91 @@ public void testDeleteMember() {
 assertEquals(expectedAssignment, result.targetAssignment());
 }
 
+@Test
+public void testStaticMemberReplace() {
+TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+"my-group",
+20
+);
+
+Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2),
+mkTopicAssignment(barTopicId, 1, 2)
+));
+
+context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4),
+mkTopicAssignment(barTopicId, 3, 4)
+));
+
+context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 5, 6),
+mkTopicAssignment(barTopicId, 5, 6)
+));
+
+context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+// Static member 3 leaves
+context.removeMemberSubscription("member-3", "member-3");
+
+// Another static member joins with the same instance id as the 
departed one
+context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", 
"bar", "zar"), new HashMap<>());

Review Comment:
   Ok.. I am slightly confused by this comment. The new member is being added 
using `addGroupMember` which internally invokes `withMembers`. I had an 
unwanted call to `updateMemberSubscription` which I have removed. Probably I am 
missing something here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-26 Thread via GitHub


philipnee opened a new pull request, #14842:
URL: https://github.com/apache/kafka/pull/14842

   A few bugs was created from the previous issues. These are:
   1. During testing or some edge cases, the coordinator request manager might 
hold on to an inflight request forever. Therefore, when invoking 
coordinatorRequestManager.poll(), nothing would return.  Here we explicitly 
create a FindCoordinatorRequest regardless of the current request state because 
we want to actively search for a coordinator
   2. ensureCoordinatorReady() might be stuck in an infinite loop forever if 
the client fail to do so.  Even the consumer would be able to shutdown 
eventually, this is undesirable.
   3. The current asyncConsumerTest mixes background/network thread shutdown 
with the consumer shutdown.  As the goal of the module is unit testing, we 
should try to test the shutdown procedure separately.  Therefore, this PR adds 
a Mockito.doAnswer call to the applicationEventHandler.close().  Tests that are 
testing shutdown are calling shutdown() explicitly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405739030


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -691,6 +735,91 @@ public void testDeleteMember() {
 assertEquals(expectedAssignment, result.targetAssignment());
 }
 
+@Test
+public void testStaticMemberReplace() {
+TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+"my-group",
+20
+);
+
+Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2),
+mkTopicAssignment(barTopicId, 1, 2)
+));
+
+context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4),
+mkTopicAssignment(barTopicId, 3, 4)
+));
+
+context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 5, 6),
+mkTopicAssignment(barTopicId, 5, 6)
+));
+
+context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+// Static member 3 leaves
+context.removeMemberSubscription("member-3", "member-3");
+
+// Another static member joins with the same instance id as the 
departed one
+context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", 
"bar", "zar"), new HashMap<>());

Review Comment:
   Ok.. I am slightly confused by this comment. The new member is being added 
using `addGroupMember` which internally invokes `withMembers`. I had an 
unwanted call to `updateMemberSubscription` which I have removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1827266684

   @dajac , thank you for another round of review. I have handled all review 
comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405738152


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -691,6 +735,91 @@ public void testDeleteMember() {
 assertEquals(expectedAssignment, result.targetAssignment());
 }
 
+@Test
+public void testStaticMemberReplace() {
+TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+"my-group",
+20
+);
+
+Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2),
+mkTopicAssignment(barTopicId, 1, 2)
+));
+
+context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4),
+mkTopicAssignment(barTopicId, 3, 4)
+));
+
+context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 5, 6),
+mkTopicAssignment(barTopicId, 5, 6)
+));
+
+context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());

Review Comment:
   I see. Thanks for the explanation, I hadn't understood the usage of these 
methods correctly. I have removed these unwanted calls to 
`updateMemberSubscription`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405737404


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -849,21 +922,53 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+if (memberEpoch == 0) {
+// A new static member joins or the existing static member 
rejoins.
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+} else {
+// Static member rejoins with a different member id so it 
should replace
+// the previous instance iff the previous member had sent 
a Leave group.
+throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+// Replace the current member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());

Review Comment:
   Thank you for the confirmation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405737202


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -223,20 +238,30 @@ public TargetAssignmentResult build() throws 
PartitionAssignorException {
 Map memberSpecs = new HashMap<>();
 
 // Prepare the member spec for all members.
-members.forEach((memberId, member) -> memberSpecs.put(memberId, 
createAssignmentMemberSpec(
-member,
-targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-subscriptionMetadata
-)));
+members.forEach((memberId, member) -> {
+memberSpecs.put(memberId, createAssignmentMemberSpec(
+member,
+targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+subscriptionMetadata
+));
+});
 
 // Update the member spec if updated or deleted members.
 updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
 if (updatedMemberOrNull == null) {
 memberSpecs.remove(memberId);
 } else {
+ConsumerGroupMember member = members.get(memberId);
+Assignment assignment;
+// A new static member joins and needs to replace an existing 
departed one.
+if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
+assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);

Review Comment:
   Yes, your understanding is correct. I see what you are saying about how this 
won't work when a new static member joins. I have updated the group to expose 
the current set of static members in the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405733434


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+// The new member can't join.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {

Review Comment:
   Done, moved the methods next to `throwIfMemberEpochIsInvalid`, added 
Javadocs and updated the argument names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732004


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -766,6 +834,11 @@ private void throwIfMemberEpochIsInvalid(
 int receivedMemberEpoch,
 List 
ownedTopicPartitions
 ) {
+// If a static member rejoins, it's previous epoch would be -2. In 
such a
+// case, we don't need to fence the member.
+if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+return;
+}

Review Comment:
   No it is not. Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732141


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -849,21 +922,53 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+if (memberEpoch == 0) {
+// A new static member joins or the existing static member 
rejoins.
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+} else {
+// Static member rejoins with a different member id so it 
should replace
+// the previous instance iff the previous member had sent 
a Leave group.
+throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+// Replace the current member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());

Review Comment:
   Added a log line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15902) Topic partitions cannot be automatically cleaned up, leading to disk space occupation

2023-11-26 Thread Gao Fei (Jira)
Gao Fei created KAFKA-15902:
---

 Summary: Topic partitions cannot be automatically cleaned up, 
leading to disk space occupation
 Key: KAFKA-15902
 URL: https://issues.apache.org/jira/browse/KAFKA-15902
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Gao Fei


we are unable to determine the cause of this situation, but the error logs from 
the faulty node process keep showing the following error:
{code:java}
ERROR Uncaught exception in scheduled task 'kafka-log-retention' 
(kafka.utils.KafkaScheduler)
java.nio.BufferOverflowException
    at java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)
    at java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882)
    at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
    at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
    at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506)
    at kafka.log.Log.$anonfun$roll$8(Log.scala:2066)
    at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066)
    at scala.Option.foreach(Option.scala:437)
    at kafka.log.Log.$anonfun$roll$2(Log.scala:2066)
    at kafka.log.Log.roll(Log.scala:2482)
    at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1859)
    at kafka.log.Log.deleteSegments(Log.scala:2482)
    at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1847)
    at kafka.log.Log.deleteOldSegments(Log.scala:1916)
    at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:1092)
    at 
kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:1089)
    at scala.collection.immutable.List.foreach(List.scala:333)
    at kafka.log.LogManager.cleanupLogs(LogManager.scala:1089)
    at 
kafka.log.LogManager.$anonfun$startupWithConfigOverrides$2(LogManager.scala:429)
    at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Correcting Javadoc for ConnectAssertions [kafka]

2023-11-26 Thread via GitHub


yashmayya commented on code in PR #14827:
URL: https://github.com/apache/kafka/pull/14827#discussion_r1405642306


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java:
##
@@ -524,7 +524,7 @@ public void assertConnectorIsStopped(String connectorName, 
String detailMessage)
  * @param connectorState
  * @param numTasks the expected number of tasks
  * @param tasksState
- * @return true if the connector and tasks are in RUNNING state; false 
otherwise
+ * @return true if the connector and tasks are in expected states; false 
otherwise

Review Comment:
   ```suggestion
* @return true if the connector and tasks are in the expected state; 
false otherwise
   ```
   nit: they're all expected to be in the same state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15857) Introduce LocalLogStartOffset and TieredOffset in OffsetSpec.

2023-11-26 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15857:
---
Fix Version/s: 3.7.0

> Introduce LocalLogStartOffset and TieredOffset in OffsetSpec.
> -
>
> Key: KAFKA-15857
> URL: https://issues.apache.org/jira/browse/KAFKA-15857
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Christo Lolov
>Priority: Major
>  Labels: need-kip, tiered-storage
> Fix For: 3.7.0
>
>
> Introduce  EarliestLocalOffset and TieredOffset in OffsetSpec which will help 
> in finding respective offsets while using AdminClient#listOffsets().
> EarliestLocalOffset - local log start offset of a topic partition.
> TieredOffset - Highest offset up to which the segments were copied to remote 
> storage.
> We can discuss further on naming and semantics of these offset specs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()

2023-11-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789901#comment-17789901
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15798:


cc [~mjsax] [~wcarlson5] – maybe some motivation to remove, or at least block, 
the named topologies feature in 3.7

> Flaky Test 
> NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
> -
>
> Key: KAFKA-15798
> URL: https://issues.apache.org/jira/browse/KAFKA-15798
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I saw a few examples recently. 2 have the same error, but the third is 
> different
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
>  
> The failure is like
> {code:java}
> java.lang.AssertionError: Did not receive all 5 records from topic 
> output-stream-1 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <5> but: <0> was less than <5>{code}
> The other failure was
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
> {code:java}
> java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()

2023-11-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789898#comment-17789898
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15798:


Took a quick look at this in the name of running down some of the worst flaky 
tests in Streams. I think it's pretty clear that this is failing because of the 
state updater thread (see below), but it's not as clear to me whether this 
hints at a real bug with the state updater thread or whether it only broke the 
named topologies feature.

If it's the latter, we should probably block people from using named topologies 
when the state updater thread is enabled in 3.7. Although I'm actually leaning 
towards going a step further and just taking out the named topologies 
altogether – we can just remove the "public" API classes for now, as extracting 
all the internal logic is somewhat of a bigger project that we shouldn't rush.

Of course, this is all assuming there is something about the state updater that 
broke named topologies – someone more familiar with the state updater should 
definitely verify that this isn't a real bug in normal Streams first! cc 
[~cadonna] [~lucasb] 

Oh, and this is how I know the state updater thread is responsible: if you look 
at [the graph of failure rates for this 
test|https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.streams.integration.NamedTopologyIntegrationTest=FLAKY=shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()],
 you'll see it goes from literally zero flakiness to the 2nd most commonly 
failing test in all of Streams on Oct 4th. This is the day we turned on the 
state updater thread by default.

(It's also a bit concerning that we didn't catch this sooner. The uptick in 
failure rate of this test is actually quite sudden. Would be great if we could 
somehow manually alert on this sort of thing)

> Flaky Test 
> NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
> -
>
> Key: KAFKA-15798
> URL: https://issues.apache.org/jira/browse/KAFKA-15798
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I saw a few examples recently. 2 have the same error, but the third is 
> different
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
>  
> The failure is like
> {code:java}
> java.lang.AssertionError: Did not receive all 5 records from topic 
> output-stream-1 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <5> but: <0> was less than <5>{code}
> The other failure was
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
> {code:java}
> java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-26 Thread via GitHub


satishd commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827083641

   @clolov: The queries that you had in your earlier comments were addressed by 
@kamalcph, please take a look. I plan to merge these changes if you have no 
further comments. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-26 Thread via GitHub


satishd merged PR #14787:
URL: https://github.com/apache/kafka/pull/14787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-26 Thread via GitHub


satishd commented on PR #14787:
URL: https://github.com/apache/kafka/pull/14787#issuecomment-1827078547

   Test failures in jenkins jobs are unrelated to this change, merging it to 
trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9693: Kafka latency spikes caused by log segment flush on roll [kafka]

2023-11-26 Thread via GitHub


github-actions[bot] commented on PR #13782:
URL: https://github.com/apache/kafka/pull/13782#issuecomment-1827074604

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]

2023-11-26 Thread via GitHub


github-actions[bot] commented on PR #14293:
URL: https://github.com/apache/kafka/pull/14293#issuecomment-1827074429

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add retry for CI [kafka]

2023-11-26 Thread via GitHub


ex172000 commented on PR #14828:
URL: https://github.com/apache/kafka/pull/14828#issuecomment-1827030588

   > Thanks for the PR. The Jenkins file already sets those settings to retry 
failed tests in cradle. See here:
   > 
   > 
https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/Jenkinsfile#L32
   > 
   > .
   > I think that the confusion comes from the fact that the retired tests are 
still reported as failures. I am looking into improving this here: #14743.
   
   Thank you @dajac for the explaination. That makes sense. Do you mind review 
#14829, looks like they are really flaky 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-11-26 Thread via GitHub


pprovenzano commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1405538961


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -121,8 +122,38 @@ public static Map createAssignmentMap(int[] 
replicas, Uuid[] dire
  * Create an array with the specified number of entries set to {@link 
#UNASSIGNED}.
  */
 public static Uuid[] unassignedArray(int length) {
+return array(length, UNASSIGNED);
+}
+
+/**
+ * Create an array with the specified number of entries set to {@link 
#MIGRATING}.
+ */
+public static Uuid[] migratingArray(int length) {
+return array(length, MIGRATING);
+}
+
+/**
+ * Create an array with the specified number of entries set to the 
specified value.
+ */
+private static Uuid[] array(int length, Uuid value) {
 Uuid[] array = new Uuid[length];
-Arrays.fill(array, UNASSIGNED);
+Arrays.fill(array, value);
 return array;
 }
+
+/**
+ * Check if a directory is online, given a sorted list of online 
directories.
+ * @param dir  The directory to check
+ * @param sortedOnlineDirs The sorted list of online directories
+ * @return true if the directory is considered online, 
false otherwise
+ */
+public static boolean isOnline(Uuid dir, List sortedOnlineDirs) {
+if (UNASSIGNED.equals(dir) || MIGRATING.equals(dir)) {

Review Comment:
   We should only allow for a directory ID to be MIGRATING if we are in 
migration mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15817: Avoid reconnecting to the same IP address if multiple ad… [kafka]

2023-11-26 Thread via GitHub


ijuma commented on PR #14813:
URL: https://github.com/apache/kafka/pull/14813#issuecomment-1826881644

   Thanks for the PR. Let's cherry pick this to 3.6 after this is merged to 
master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-11-26 Thread via GitHub


riedelmax commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1405422751


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -445,6 +446,42 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+
+public List 
consumerGroupDescribe(
+List groupIds,
+long committedOffset
+) {
+List response = new 
ArrayList<>();
+
+for (String groupId: groupIds) {
+Group group = groups.get(groupId, committedOffset);

Review Comment:
   Good idea. I added a `consumerGroup` method accordingly and made the 
structure more consistent with `describeGroups`.
   
   Is it correct for consumer groups to consider the group "Dead" when we can't 
find a group with the groupId? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-11-26 Thread via GitHub


riedelmax commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1405412781


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -545,6 +547,32 @@ public String currentAssignmentSummary() {
 ')';
 }
 
+public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(Assignment targetAssignment) {

Review Comment:
   It would be possible of course.. but we would have redundant code to map 
from a Map of partitions to a List of TopicPartitions in 
ConsumerGroupMember.java and ConsumerGroup.java
   I think its better the way it is, pass the targetAssignment to 
ConsumerGroupMember.java and use the private static method 
topicPartitionsFromMap
   
   let me know what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15901) Implement client changes to support telemetry APIs

2023-11-26 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15901:
-

 Summary: Implement client changes to support telemetry APIs
 Key: KAFKA-15901
 URL: https://issues.apache.org/jira/browse/KAFKA-15901
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


Implement changes in Java client to support telemetry APIs. The support should 
be added in Producer, Consumer and Admin client to configure telemetry reporter 
and support APIs. Corresponding changes in NetworkClient to allow send of 
telemetry calls from Java client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15894: Fix KafkaApis.updateRecordConversionStats to ensure MessageConversionsTimeMs and TemporaryMemoryBytes are recorded correctly [kafka]

2023-11-26 Thread via GitHub


runom opened a new pull request, #14841:
URL: https://github.com/apache/kafka/pull/14841

   `KafkaApis.updateRecordConversionStats` may be called multiple times for a 
request, and `request.messageConversionsTimeNanos` and 
`request.temporaryMemoryBytes` are overwritten in this method. 
   
   ```scala
   def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
 processingStats.forKeyValue { (tp, info) =>
   updateRecordConversionStats(request, tp, info)
 }
   }
   ```
   ```scala
   private def updateRecordConversionStats(request: RequestChannel.Request,
 tp: TopicPartition,
 conversionStats: 
RecordValidationStats): Unit = {
   ...
   request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
 }
 request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
   }
   ```
   
   So, in that case, MessageConversionsTimeMs and TemporaryMemoryBytes are not 
recorded correctly.
   This Pull Request has fixed the code to add up each value instead of 
overwriting.
   
   To test this behavior, this PR also added a test that sends a ProduceRequest 
containing multiple partitions and calls `updateRecordConversionStats` through 
`processingStatsCallback`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org