Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-29 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1619973284


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be copied per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = 
"remote.log.manager.copy.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote copy quota management. " +
+"The default value is 61, which means there are 60 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 
61;
+
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = 
"remote.log.manager.copy.quota.window.size.seconds";
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each 
sample for remote copy quota management. " +
+"The default value is 1 second.";
+public static final int 
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.fetch.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of 
bytes that can be fetched from remote storage to local storage per second. " +
+"This is a global limit for all the partitions that are being 
fetched from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be fetched per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP 
= "remote.log.manager.fetch.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote fetch quota management. " 
+
+"The default value is 11, which means there are 10 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM 
= 11;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   That was for the subscription type right which is just one word? Homogeneous 
uniform assignment builder is kind of a mouthful. 
   I used the wrong term, by user I meant anyone looking at the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619961360


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   That was for the subscription type right? Homogeneous uniform assignment 
builder is kind of a mouthful. 
   I used the wrong term, by user I meant anyone looking at the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16853) Split RemoteLogManagerScheduledThreadPool

2024-05-29 Thread Abhijeet Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850578#comment-17850578
 ] 

Abhijeet Kumar commented on KAFKA-16853:


Thanks [~christo_lolov] . I will work on this.

> Split RemoteLogManagerScheduledThreadPool
> -
>
> Key: KAFKA-16853
> URL: https://issues.apache.org/jira/browse/KAFKA-16853
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Abhijeet Kumar
>Priority: Major
>
> *Summary*
> To begin with create just the RemoteDataExpirationThreadPool and move 
> expiration to it. Keep all settings as if the only thread pool was the 
> RemoteLogManagerScheduledThreadPool. Ensure that the new thread pool is wired 
> correctly to the RemoteLogManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619937439


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619924951


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {

Review Comment:
   nit: can we add java docs for this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the 

Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619920514


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
 }
 return true;
 }
+
+private static Map 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
 final Map kafkaStreamsAssignments) {
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clientsByUuid = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+final Function> clientTagGetter 
= createClientTagGetter(applicationState);
+
+final Map pendingStandbyTasksToClientId = new 
HashMap<>();
+for (final TaskId statefulTaskId : statefulTaskIds) {
+for (final KafkaStreamsAssignment assignment : 
clientsByUuid.values()) {
+if (assignment.tasks().containsKey(statefulTaskId)) {
+assignStandbyTasksToClientsWithDifferentTags(
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+statefulTaskId,
+assignment.processId(),
+rackAwareAssignmentTags,
+streamStates,
+kafkaStreamsAssignments,
+tasksToRemainingStandbys,
+tagStatistics.tagKeyToValues,
+tagStatistics.tagEntryToClients,
+pendingStandbyTasksToClientId,
+clientTagGetter
+);
+}
+}
+}
+
+if (!tasksToRemainingStandbys.isEmpty()) {
+assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid,
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+tasksToRemainingStandbys);
+}
+
+return kafkaStreamsAssignments;
+}
+
+private static Map 
loadBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
  final Map kafkaStreamsAssignments) 
{
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clients = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet()));
+for (final TaskId task : statefulTaskIds) {
+assignStandbyTasksForActiveTask(numStandbyReplicas, clients,
+tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task);
+}
+return kafkaStreamsAssignments;
+}
+
+private static void assignStandbyTasksForActiveTask(final int 
numStandbyReplicas,
+final Map clients,
+final Map tasksToRemainingStandbys,
+final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619917677


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619915938


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619907267


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619896882


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
 }
 return true;
 }
+
+private static Map 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
 final Map kafkaStreamsAssignments) {
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clientsByUuid = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+final Function> clientTagGetter 
= createClientTagGetter(applicationState);
+
+final Map pendingStandbyTasksToClientId = new 
HashMap<>();
+for (final TaskId statefulTaskId : statefulTaskIds) {
+for (final KafkaStreamsAssignment assignment : 
clientsByUuid.values()) {
+if (assignment.tasks().containsKey(statefulTaskId)) {
+assignStandbyTasksToClientsWithDifferentTags(
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+statefulTaskId,
+assignment.processId(),
+rackAwareAssignmentTags,
+streamStates,
+kafkaStreamsAssignments,
+tasksToRemainingStandbys,
+tagStatistics.tagKeyToValues,
+tagStatistics.tagEntryToClients,
+pendingStandbyTasksToClientId,
+clientTagGetter
+);
+}
+}
+}
+
+if (!tasksToRemainingStandbys.isEmpty()) {
+assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid,
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+tasksToRemainingStandbys);
+}
+
+return kafkaStreamsAssignments;
+}
+
+private static Map 
loadBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
  final Map kafkaStreamsAssignments) 
{
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clients = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet()));
+for (final TaskId task : statefulTaskIds) {
+assignStandbyTasksForActiveTask(numStandbyReplicas, clients,
+tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task);
+}
+return kafkaStreamsAssignments;
+}
+
+private static void assignStandbyTasksForActiveTask(final int 
numStandbyReplicas,
+final Map clients,
+final Map tasksToRemainingStandbys,
+final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619898066


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment 

Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619896882


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
 }
 return true;
 }
+
+private static Map 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
 final Map kafkaStreamsAssignments) {
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clientsByUuid = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+final Function> clientTagGetter 
= createClientTagGetter(applicationState);
+
+final Map pendingStandbyTasksToClientId = new 
HashMap<>();
+for (final TaskId statefulTaskId : statefulTaskIds) {
+for (final KafkaStreamsAssignment assignment : 
clientsByUuid.values()) {
+if (assignment.tasks().containsKey(statefulTaskId)) {
+assignStandbyTasksToClientsWithDifferentTags(
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+statefulTaskId,
+assignment.processId(),
+rackAwareAssignmentTags,
+streamStates,
+kafkaStreamsAssignments,
+tasksToRemainingStandbys,
+tagStatistics.tagKeyToValues,
+tagStatistics.tagEntryToClients,
+pendingStandbyTasksToClientId,
+clientTagGetter
+);
+}
+}
+}
+
+if (!tasksToRemainingStandbys.isEmpty()) {
+assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid,
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+tasksToRemainingStandbys);
+}
+
+return kafkaStreamsAssignments;
+}
+
+private static Map 
loadBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
  final Map kafkaStreamsAssignments) 
{
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clients = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet()));
+for (final TaskId task : statefulTaskIds) {
+assignStandbyTasksForActiveTask(numStandbyReplicas, clients,
+tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task);
+}
+return kafkaStreamsAssignments;
+}
+
+private static void assignStandbyTasksForActiveTask(final int 
numStandbyReplicas,
+final Map clients,
+final Map tasksToRemainingStandbys,
+final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619894784


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above

Review Comment:
   nit: // Revoke the partitions that either:
   // 1. Are not part of the member's current subscription.
   // 2. Exceed the maximum quota assigned to each member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619892264


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.

Review Comment:
   nit: members that* should



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619888416


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *  Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {
+private static final Class UNMODIFIALBE_MAP_CLASS = 
Collections.unmodifiableMap(new HashMap<>()).getClass();
+private static final Class EMPTY_MAP_CLASS = 
Collections.emptyMap().getClass();
+
+/**
+ * @return True if the provided map is an UnmodifiableMap or EmptyMap. 
Those classes are not
+ * public hence we cannot use the `instanceof` operator.
+ */
+private static boolean isImmutableMap(Map map) {
+return UNMODIFIALBE_MAP_CLASS.isInstance(map) || 
EMPTY_MAP_CLASS.isInstance(map);

Review Comment:
   nit: UNMODIFIABLE *



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619808751


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java:
##
@@ -30,13 +30,13 @@
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ConstrainedPrioritySet {
+public class ConstrainedPrioritySet {
 
 private final PriorityQueue clientsByTaskLoad;
 private final BiFunction constraint;
 private final Set uniqueClients = new HashSet<>();
 
-ConstrainedPrioritySet(final BiFunction constraint,
+public ConstrainedPrioritySet(final BiFunction 
constraint,
final Function weight) {

Review Comment:
   nit: adjust parameter indentation



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
 }
 return true;
 }
+
+private static Map 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
 final Map kafkaStreamsAssignments) {
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clientsByUuid = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+final Function> clientTagGetter 
= createClientTagGetter(applicationState);
+
+final Map pendingStandbyTasksToClientId = new 
HashMap<>();
+for (final TaskId statefulTaskId : statefulTaskIds) {
+for (final KafkaStreamsAssignment assignment : 
clientsByUuid.values()) {
+if (assignment.tasks().containsKey(statefulTaskId)) {
+assignStandbyTasksToClientsWithDifferentTags(
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+statefulTaskId,
+assignment.processId(),
+rackAwareAssignmentTags,
+streamStates,
+kafkaStreamsAssignments,
+tasksToRemainingStandbys,
+tagStatistics.tagKeyToValues,
+tagStatistics.tagEntryToClients,
+pendingStandbyTasksToClientId,
+clientTagGetter
+);
+}
+}
+}
+
+if (!tasksToRemainingStandbys.isEmpty()) {
+assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid,
+numStandbyReplicas,
+standbyTaskClientsByTaskLoad,
+tasksToRemainingStandbys);
+}
+
+return kafkaStreamsAssignments;
+}
+
+private static Map 
loadBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
  final Map kafkaStreamsAssignments) 
{
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clients = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+entry -> entry.getKey().id(),
+Map.Entry::getValue
+));
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619884277


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   Interesting… So you’re fine with adding those names to the public interface 
(what the end user really see) but not to follow the same naming scheme for the 
internal classes (that the end user don’t see at all). In my opinion, it makes 
sense to align as it will make it more coherent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619875174


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -18,29 +18,18 @@
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with

Review Comment:
   I feel like homogeneous and heterogeneous are harder names from a user pov, 
I think we should keep it as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-29 Thread via GitHub


gongxuanzhang commented on PR #16097:
URL: https://github.com/apache/kafka/pull/16097#issuecomment-2138615902

   I revised this PR.It add spotless gradle plugin and mini configuration.
   It contains two part.
   1.  checkstyle import order rule. But suppress all module. you can change 
`checkstyle/suppressions.xml` open  module check If you want 
   2.  auto format import order. Again, I did not open in any module. If you 
want open the auto format. Just add a module name into `build.gradle 
#spotlessApplyModules`   like `def spotlessApplyModules = ['core']` .then run 
`./gradlew spotlessApply` you will get formated code module
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-05-29 Thread via GitHub


github-actions[bot] commented on PR #15265:
URL: https://github.com/apache/kafka/pull/15265#issuecomment-2138611194

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619777550


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -244,18 +274,117 @@ public static Map 
optimizeRackAwareStandbyTas
 );
 LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-throw new UnsupportedOperationException("Not yet Implemented.");
+final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+final BiFunction> getMovableTasks = (source, destination) -> {
+return source.tasks().values().stream().filter(task -> task.type() 
== AssignedTask.Type.STANDBY)
+.filter(task -> {
+return !destination.tasks().containsKey(task.id());
+})
+.filter(task -> {
+final KafkaStreamsState sourceState = 
kafkaStreamsStates.get(source.processId());
+final KafkaStreamsState destinationState = 
kafkaStreamsStates.get(source.processId());
+return moveablePredicate.canMoveStandbyTask(sourceState, 
destinationState, task.id(), kafkaStreamsAssignments);
+})
+.map(AssignedTask::id)
+.sorted()
+.collect(Collectors.toList());
+};
+
+final long startTime = System.currentTimeMillis();
+boolean taskMoved = true;
+int round = 0;
+final RackAwareGraphConstructor 
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+taskMoved = false;
+round++;
+for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+final UUID clientId1 = clientIds.get(i);
+final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+final UUID clientId2 = clientIds.get(i);
+final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+final String rack1 = 
clientRacks.get(clientState1.processId().id()).get();
+final String rack2 = 
clientRacks.get(clientState2.processId().id()).get();
+// Cross rack traffic can not be reduced if racks are the 
same
+if (rack1.equals(rack2)) {
+continue;
+}
+
+final List movable1 = 
getMovableTasks.apply(clientState1, clientState2);
+final List movable2 = 
getMovableTasks.apply(clientState2, clientState1);
+
+// There's no needed to optimize if one is empty because 
the optimization
+// can only swap tasks to keep the client's load balanced
+if (movable1.isEmpty() || movable2.isEmpty()) {
+continue;
+}
+
+final List taskIdList = 
Stream.concat(movable1.stream(),
+movable2.stream())

Review Comment:
   I noticed this is messed up in the original code too, but can you fix the 
indentation/remove the line break (just here, no need to fix it in the 
RackAwareTaskAssignor too)



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -244,18 +274,117 @@ public static Map 
optimizeRackAwareStandbyTas
 );
 LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-throw new UnsupportedOperationException("Not yet Implemented.");
+final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+final BiFunction> getMovableTasks = (source, destination) -> {
+return source.tasks().values().stream().filter(task -> task.type() 
== AssignedTask.Type.STANDBY)
+.filter(task -> {
+return !destination.tasks().containsKey(task.id());
+})
+.filter(task -> {
+final KafkaStreamsState sourceState = 
kafkaStreamsStates.get(source.processId());
+final KafkaStreamsState destinationState = 
kafkaStreamsStates.get(source.processId());
+return moveablePredicate.canMoveStandbyTask(sourceState, 
destinationState, task.id(), kafkaStreamsAssignments);
+})
+.map(AssignedTask::id)
+.sorted()
+.collect(Collectors.toList());
+};
+
+final long startTime = System.currentTimeMillis();
+boolean taskMoved = 

Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619732219


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -278,7 +278,7 @@ public void processOptimizedAssignments(final 
Map entry : 
optimizedAssignments.entrySet()) {
 final ProcessId processId = entry.getKey();
-final Set assignedTasks = 
optimizedAssignments.get(processId).assignment();
+final Set assignedTasks = new 
HashSet<>(optimizedAssignments.get(processId).tasks().values());

Review Comment:
   This can be a followup PR, but one of the nice things about making 
KafkaStreamsAssignment mutable is that we should be able to get rid of the 
`newAssignments` field altogether and stop converting back and forth between 
the KafkaStreamsAssignment and raw Sets so that we can add/remove tasks. 
   
   In addition to the obvious code simplification that should result from this 
change, it should save us a lot of copying things into various different data 
structures and reduce a lot of overhead



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java:
##
@@ -62,14 +64,24 @@ public static KafkaStreamsAssignment of(final ProcessId 
processId, final Set assignment,
final Optional 
followupRebalanceDeadline) {
+this(
+processId,
+assignment.stream().collect(Collectors.toMap(AssignedTask::id, 
Function.identity())),
+followupRebalanceDeadline
+);
+}
+
+private KafkaStreamsAssignment(final ProcessId processId,

Review Comment:
   nit: seems kind of unnecessary to introduce yet another constructor for 
this, we can just inline things. ie
   
   ```
   this.assignment = 
assignment.stream().collect(Collectors.toMap(AssignedTask::id, 
Function.identity()));
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java:
##
@@ -83,16 +95,10 @@ public ProcessId processId() {
 
 /**
  *
- * @return a set of assigned tasks that are part of this {@code 
KafkaStreamsAssignment}
+ * @return a read-only set of assigned tasks that are part of this {@code 
KafkaStreamsAssignment}
  */
-public Set assignment() {
-// TODO change assignment to return a map so we aren't forced to copy 
this into a Set
-return new HashSet<>(assignment.values());
-}
-
-// TODO: merge this with #assignment by having it return a Map
-public Set assignedTaskIds() {
-return assignment.keySet();
+public Map tasks() {
+return unmodifiableMap(assignment);

Review Comment:
   nit: should probably changed the field name too (ie `assignment` --> `tasks`)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java:
##
@@ -489,10 +489,10 @@ public SortedMap> 
taskIdsByPreviousConsumer() {
 }
 
 public void setAssignedTasks(final KafkaStreamsAssignment assignment) {
-final Set activeTasks = assignment.assignment().stream()
+final Set activeTasks = assignment.tasks().values().stream()
 .filter(task -> task.type() == 
ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id)
 .collect(Collectors.toSet());
-final Set standbyTasks = assignment.assignment().stream()
+final Set standbyTasks = assignment.tasks().values().stream()
 .filter(task -> task.type() == 
STANDBY).map(KafkaStreamsAssignment.AssignedTask::id)
 .collect(Collectors.toSet());
 assignedActiveTasks.taskIds(activeTasks);

Review Comment:
   nit: I keep getting confused by this and then realizing it's just a 
poorly-named method, can you rename it to `#setTaskIds` or something like that?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -785,12 +789,13 @@ private UserTaskAssignmentListener 
assignTasksToClients(final Cluster fullMetada
 );
 final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = userTaskAssignor.get();
 final TaskAssignment taskAssignment = 
assignor.assign(applicationState);
-processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
 final AssignmentError assignmentError = 
validateTaskAssignment(applicationState, taskAssignment);
-userTaskAssignmentListener = (GroupAssignment assignment, 
GroupSubscription subscription) -> {
+processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
+userTaskAssignmentListener = (assignment, subscription) -> {
 assignor.onAssignmentComputed(assignment, subscription, 
assignmentError);
 };

Review Comment:
   Really doesn't matter, this is 

Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]

2024-05-29 Thread via GitHub


showuon commented on code in PR #16085:
URL: https://github.com/apache/kafka/pull/16085#discussion_r1619753582


##
core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala:
##
@@ -238,5 +245,25 @@ class LogSegmentsTest {
 assertEquals(Int.MaxValue, LogSegments.sizeInBytes(asList(logSegment)))
 assertEquals(largeSize, LogSegments.sizeInBytes(asList(logSegment, 
logSegment)))
 assertTrue(UnifiedLog.sizeInBytes(asList(logSegment, logSegment)) > 
Int.MaxValue)
+
+val logSegments: LogSegments = new LogSegments(topicPartition)
+logSegments.add(logSegment)
+assertEquals(Int.MaxValue, logSegments.sizeInBytes())
+  }
+
+  @Test
+  def testUpdateDir(): Unit = {
+val seg1 = createSegment(1)

Review Comment:
   seg1 should be closed.



##
core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala:
##
@@ -105,8 +109,6 @@ class LogSegmentsTest {
 assertFalse(segments.nonEmpty)
 assertEquals(0, segments.numberOfSegments)
 assertFalse(segments.contains(offset1))
-
-segments.close()

Review Comment:
   You're right. So it is a bug here. Now, I think what we have to do is 
manually close seg1, seg2, seg3 here, otherwise, there will be resource leak.



##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -632,6 +632,15 @@ class LogSegmentTest {
 Utils.delete(tempDir)
   }
 
+  @Test
+  def testGetFirstBatchTimestamp(): Unit = {
+val segment = createSegment(1)

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Make VoterSetHistoryTest.testAddAt make sense [kafka]

2024-05-29 Thread via GitHub


dengziming merged PR #16104:
URL: https://github.com/apache/kafka/pull/16104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16639: AsyncKafkaConsumer#close dose not send heartbeat to leave group. [kafka]

2024-05-29 Thread via GitHub


frankvicky commented on code in PR #16017:
URL: https://github.com/apache/kafka/pull/16017#discussion_r1619740146


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -752,6 +752,26 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 assertEquals(1, result.unsentRequests.size(), "Fenced member should 
resume heartbeat after transitioning to JOINING");
 }
 
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {
+mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
+
+membershipManager.leaveGroup();
+assertTrue(membershipManager.isLeavingGroup());

Review Comment:
   This makes sense. In the unit test for 'HeartBratRequestManager,' we should 
focus solely on its functionality.
   I will remove this line. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-29 Thread via GitHub


FrankYang0529 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1619707209


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -50,14 +51,13 @@ class ConsumerGroupCommandTestUtils {
 private ConsumerGroupCommandTestUtils() {
 }
 
-static List generator() {
+static List generator(boolean onlyConsumerGroupCoordinator) 
{

Review Comment:
   Updated it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate DescribeConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-29 Thread via GitHub


FrankYang0529 commented on PR #15908:
URL: https://github.com/apache/kafka/pull/15908#issuecomment-2138512364

   > > That is why testing time takes longer. In my latest update, I change to 
run following cases. Also, in old framework, we started KRAFT server with 
group.coordinator.new.enable=true twice. One for classic group protocol and 
another for consumer group protocol. In new framework, we can get supported 
group protocol by ClusterInstance#supportedGroupProtocols, so I use one KRAFT 
server with group.coordinator.new.enable=true to run two protocols. We can 
reduce time to setup one KRAFT cluster. I hope this change can reduce overall 
testing time. Thanks.
   > > (ZK / KRAFT servers) with (group.coordinator.new.enable=false) with 
(classic group protocol) = 2 cases
   > > (KRAFT servers) with (group.coordinator.new.enable=true) with (classic / 
consumer group protocols) = 2 cases
   > 
   > agree. could this PR include this change?
   
   Yes, I already changed it. Please take a look for 
`DescribeConsumerGroupTest#generator`. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-29 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16790.
---
Fix Version/s: 3.8.0
   3.7.1
   Resolution: Fixed

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- 

[jira] [Commented] (KAFKA-16858) Flatten SMT throws NPE

2024-05-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850530#comment-17850530
 ] 

Greg Harris commented on KAFKA-16858:
-

Thanks so much for the additional context [~undone] this is a very odd bug.

I managed to reproduce a Flatten NPE with mutable input objects rather than 
mocking: 

 
{noformat}
xformValue.configure(Collections.emptyMap());

Schema innerSchema = SchemaBuilder.struct().optional();
Struct innerStruct = new Struct(innerSchema);
AtomicReference valueSchema = new AtomicReference<>(innerSchema);
Schema arraySchema = SchemaBuilder.array(new ConnectSchema(Schema.Type.ARRAY) {
@Override
public Schema valueSchema() {
return valueSchema.get();
}
});
Schema schema = SchemaBuilder.struct().field("field", arraySchema);
Struct value = new Struct(schema).put("field", 
Collections.singletonList(Collections.singletonList(innerStruct)));
valueSchema.set(null);

SourceRecord record = new SourceRecord(null, null, "topic", 0, schema, value);
xformValue.apply(record);{noformat}
It throws this error:
{noformat}
Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because "schema" is 
null
java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.connect.data.Schema.name()" because "schema" is null
    at 
org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)
    at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)
    at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
    at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
    at 
org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:250)
    at 
org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:164)
    at 
org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:79){noformat}
This is different from your stacktrace in two ways:
 # Instead of 4 buildWithSchema calls, there's only 1. This is because my test 
Struct is nested less deeply than your Struct, and a deeper test case behaves 
almost identically.
 # Instead of 3 validateValue calls, there's 5. This is because I'm using an a 
nested array "[[Struct]]" instead of your singly-nested array "[Struct]". This 
one is a bit more important, because the reproduction case doesn't work for 
singly-nested arrays. The difference is that a singly-nested array has it's 
valueSchema evaluated during `buildUpdatedSchema`, and the doubly-nested array 
has it's valueSchema evaluated during `buildWithSchema`. When the null 
valueSchema is evaluated during buildUpdatedSchema, it throws this exception 
instead:

 
{noformat}
valueSchema cannot be null.
org.apache.kafka.connect.errors.SchemaBuilderException: valueSchema cannot be 
null.
    at 
app//org.apache.kafka.connect.data.SchemaBuilder.array(SchemaBuilder.java:363)
    at 
app//org.apache.kafka.connect.transforms.util.SchemaUtil.copySchemaBasics(SchemaUtil.java:29)
    at 
app//org.apache.kafka.connect.transforms.Flatten.convertFieldSchema(Flatten.java:225)
    at 
app//org.apache.kafka.connect.transforms.Flatten.buildUpdatedSchema(Flatten.java:201)
    at 
app//org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:156)
    at 
app//org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:79){noformat}
I was unable to reproduce this with non-mutable schemas, they trigger the NPE 
too early while the input Struct is being constructed.

Some follow-ups:
 * Are you able to provide an anonymized form of your schema directly, rather 
than just a high-level "Array of Structs"? I'm wondering if your schema is 
capable of triggering the use of the mutable SchemaWrapper 
[https://github.com/confluentinc/schema-registry/blob/7b886f309c83041d4f2a5b41b5910f3b8002413a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java#L1779]
 inside the ProtobufConverter.
 * I don't have an explanation of how this can happen for empty and non-present 
arrays, as it looks like validateValue(ConnectSchema:255) can only be triggered 
by non-empty lists.
 * w.r.t. the variable validateValue depth: Are you saying that in _error 
cases_ the recursion depth is unpredictable, or in general? The validateValue 
should be called at every or almost every location in the tree of values, so I 
would expect to see lots of different recursion depths. Maybe you can share 
some more stacktraces as examples.
 * So far in this investigation, I'm trying to find the source of the null in 
hopes that we can prevent it, and get 

Re: [PR] KAFKA-16790: Update RemoteLogManager configuration in broker server [kafka]

2024-05-29 Thread via GitHub


showuon merged PR #16005:
URL: https://github.com/apache/kafka/pull/16005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]

2024-05-29 Thread via GitHub


showuon commented on PR #16005:
URL: https://github.com/apache/kafka/pull/16005#issuecomment-2138452239

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Improve producer ID expiration performance [kafka]

2024-05-29 Thread via GitHub


jolshan merged PR #16075:
URL: https://github.com/apache/kafka/pull/16075


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-29 Thread via GitHub


jolshan opened a new pull request, #16130:
URL: https://github.com/apache/kafka/pull/16130

   As per KIP-1022, we will rename the unstable metadata versions enabled 
config to support all feature versions.
   
   Features is also updated to return latest production and latest testing 
versions of each feature.
   
   A feature is production ready when the corresponding metadata version 
(bootstrapMetadataVersion) is production ready.
   
   Adds tests for the feature usage of the unstableFeatureVersionsEnabled config


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan merged PR #15685:
URL: https://github.com/apache/kafka/pull/15685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2138416302

   Test failures are unrelated. Merging  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [WIP] [kafka]

2024-05-29 Thread via GitHub


apourchet opened a new pull request, #16129:
URL: https://github.com/apache/kafka/pull/16129

   This fills in the implementation details of the standby task assignment 
utility functions within TaskAssignmentUtils.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]

2024-05-29 Thread via GitHub


ableegoldman merged PR #16033:
URL: https://github.com/apache/kafka/pull/16033


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on PR #16033:
URL: https://github.com/apache/kafka/pull/16033#issuecomment-2138398308

   Test failures are unrelated, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1619561495


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -16,78 +16,395 @@
  */
 package org.apache.kafka.streams.processor.assignment;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.SortedSet;
+import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.internals.assignment.Graph;
+import 
org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory;
+import org.apache.kafka.streams.StreamsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A set of utilities to help implement task assignment via the {@link 
TaskAssignor}
  */
 public final class TaskAssignmentUtils {
+private static final Logger LOG = 
LoggerFactory.getLogger(TaskAssignmentUtils.class);
+
+private TaskAssignmentUtils() {}
+
 /**
- * Assign standby tasks to KafkaStreams clients according to the default 
logic.
- * 
- * If rack-aware client tags are configured, the rack-aware standby task 
assignor will be used
+ * Return a "no-op" assignment that just copies the previous assignment of 
tasks to KafkaStreams clients
  *
- * @param applicationStatethe metadata and other info describing 
the current application state
- * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
+ * @param applicationState the metadata and other info describing the 
current application state
  *
- * @return a new map containing the mappings from KafkaStreamsAssignments 
updated with the default
- * standby assignment
+ * @return a new map containing an assignment that replicates exactly the 
previous assignment reported
+ * in the applicationState
  */
-public static Map 
defaultStandbyTaskAssignment(
-final ApplicationState applicationState,
-final Map kafkaStreamsAssignments
-) {
-throw new UnsupportedOperationException("Not Implemented.");
+public static Map 
identityAssignment(final ApplicationState applicationState) {
+final Map assignments = new 
HashMap<>();
+applicationState.kafkaStreamsStates(false).forEach((processId, state) 
-> {
+final Set tasks = new HashSet<>();
+state.previousActiveTasks().forEach(taskId -> {
+tasks.add(new AssignedTask(taskId,
+AssignedTask.Type.ACTIVE));
+});
+state.previousStandbyTasks().forEach(taskId -> {
+tasks.add(new AssignedTask(taskId,
+AssignedTask.Type.STANDBY));
+});
+
+final KafkaStreamsAssignment newAssignment = 
KafkaStreamsAssignment.of(processId, tasks);
+assignments.put(processId, newAssignment);
+});
+return assignments;
 }
 
 /**
- * Optimize the active task assignment for rack-awareness
+ * Optimize active task assignment for rack awareness. This optimization 
is based on the
+ * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG 
trafficCost}
+ * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG 
nonOverlapCost}
+ * configs which balance cross rack traffic minimization and task movement.
+ * Setting {@code trafficCost} to a larger number reduces the overall 
cross rack traffic of the resulting
+ * assignment, but can increase the number of tasks shuffled around 
between clients.
+ * Setting {@code nonOverlapCost} to a larger number increases the 
affinity of tasks to their intended client
+ * and reduces the amount by which the rack-aware optimization can shuffle 
tasks around, at the cost of higher
+ * cross-rack traffic.
+ * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code 
trafficCost} to a positive value,
+ * the resulting assignment will have an absolute minimum of cross rack 
traffic. If we set {@code trafficCost} to 0,
+ * and {@code nonOverlapCost} to a positive value, the resulting 
assignment will be identical to the input assignment.
+ * 
+ * Note: this method will modify the input {@link KafkaStreamsAssignment} 
objects and return the same map.
+ * It does not make a copy of the map or the KafkaStreamsAssignment 
objects.
+ * 
+ * This method 

Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]

2024-05-29 Thread via GitHub


muralibasani commented on PR #16005:
URL: https://github.com/apache/kafka/pull/16005#issuecomment-2138352697

   @nikramakrishnan @showuon there are some test failures but not sure if 
related to ours ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850511#comment-17850511
 ] 

Chia-Ping Tsai commented on KAFKA-15305:


[~kirktrue] As you unassign this ticket, I take over this one and will file PR 
according to above solution.

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on PR #15986:
URL: https://github.com/apache/kafka/pull/15986#issuecomment-2138338743

   Thanks for the PR, @jsancio . A few meta-comments:
   
   - I'm not sure I see the benefit to changing the `toString` functions to use 
`String.format`. It seems more brittle than the simple string concatenation 
approach. Unless you want to print something in a specific way, like 
`String.format("%02d", myInt)`. But that isn't the case here.
   
   - Changing the SharedServer constructor is a huge pain and generates a lot 
of churn. Since the thing you're passing comes from the static configuration 
anyway, let's not do that.
   
   - I think we should try to avoid doing too much validation in `KafkaConfig`. 
Things like hostnames should be resolved when we actually need them. It would 
be silly for one unresolvable hostname to make configuration validation fail, 
and hence fail the whole kcontroller startup process.
   
   - I don't think we want to change all of the tests to use 
`controller.quorum.bootstrap.servers`. We still need to support the old 
configuration. Let's make it so that tests using IBP_3_8_IV0 or newer use the 
new thing, and tests using an older MV use the old configuration. That way we 
will have good coverage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]

2024-05-29 Thread via GitHub


apourchet commented on code in PR #16123:
URL: https://github.com/apache/kafka/pull/16123#discussion_r1619452846


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster 
fullMetadata,
 
 final 
Optional 
userTaskAssignor =
 userTaskAssignorSupplier.get();
+UserTaskAssignmentListener userTaskAssignmentListener = 
(GroupAssignment assignment, GroupSubscription subscription) -> { };

Review Comment:
   You can't change it once you make it final.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-29 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-15305:
--

Assignee: Chia-Ping Tsai

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215
 ] 

Kirk True edited comment on KAFKA-16792 at 5/29/24 10:04 PM:
-

These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords
 - testResetToCommittedOffset


was (Author: kirktrue):
These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)
 - testFetchStableOffsetThrowInPosition (wasn't in the above list)

The following still don't work:
 - testCurrentLag
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords
 - testResetToCommittedOffset

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215
 ] 

Kirk True edited comment on KAFKA-16792 at 5/29/24 10:03 PM:
-

These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)
 - testFetchStableOffsetThrowInPosition (wasn't in the above list)

The following still don't work:
 - testCurrentLag
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords
 - testResetToCommittedOffset


was (Author: kirktrue):
These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)
 - testFetchStableOffsetThrowInPosition (wasn't in the above list)

The following still don't work:
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords
 - testResetToCommittedOffset

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619507872


##
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##
@@ -156,22 +156,24 @@ public boolean canGrantVote(ReplicaKey candidateKey, 
boolean isLogUpToDate) {
 log.debug(
 "Rejecting vote request from candidate ({}) since we already have 
a leader {} in epoch {}",
 candidateKey,
-leaderId(),
+leader,
 epoch
 );
 return false;
 }
 
 @Override
 public String toString() {
-return "FollowerState(" +
-"fetchTimeoutMs=" + fetchTimeoutMs +
-", epoch=" + epoch +
-", leaderId=" + leaderId +
-", voters=" + voters +
-", highWatermark=" + highWatermark +
-", fetchingSnapshot=" + fetchingSnapshot +
-')';
+return String.format(

Review Comment:
   Hmm, I'm not sure I see a lot of benefit to this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506788


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}

Review Comment:
   It would be better to do this in a separate cleanup PR.



##
core/src/test/resources/log4j.properties:
##
@@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m 
(%c:%L)%n
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
 
-

Review Comment:
   It would be better to do this in a separate cleanup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-29 Thread via GitHub


soarez commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1619521840


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -105,6 +108,49 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map props) {
+List configValues = super.validate(props).configValues();
+String emitCheckpointsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT));
+String syncGroupOffsetsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT));
+String emitOffsetSyncsValue = 
Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT));
+
+if ("false".equals(emitCheckpointsValue) && 
"false".equals(syncGroupOffsetsValue)) {
+ConfigValue syncGroupOffsets = configValues.stream().filter(prop 
-> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+ConfigValue emitCheckpoints = configValues.stream().filter(prop -> 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+String errorMessage = "MirrorCheckpointConnector can't run with 
both" +
+MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false";
+syncGroupOffsets.addErrorMessage(errorMessage);
+emitCheckpoints.addErrorMessage(errorMessage);
+}
+if ("false".equals(emitOffsetSyncsValue) && 
("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) {
+ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> 
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't 
run while MirrorSourceConnector configured with" +
+MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to 
false");
+}
+return new Config(configValues);

Review Comment:
   Thanks for the explanation Omnia. That makes sense. Could 
`MirrorCheckpointConfig` have a new method performs these validations, and we 
call that method from `MirrorCheckPointconnector#validate`? Maybe the method 
could return `Optional` if there is any validation issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619520419


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -55,15 +57,41 @@ final public class VoterSet {
 }
 
 /**
- * Returns the socket address for a given voter at a given listener.
+ * Returns the node information for all the given voter ids and listener.
  *
- * @param voter the id of the voter
- * @param listener the name of the listener
- * @return the socket address if it exists, otherwise {@code 
Optional.empty()}
+ * @param voterIds the ids of the voters
+ * @param listenerName the name of the listener
+ * @return the node information for all of the voter ids
+ * @throws IllegalArgumentException if there are missing endpoints
  */
-public Optional voterAddress(int voter, String 
listener) {
-return Optional.ofNullable(voters.get(voter))
-.flatMap(voterNode -> voterNode.address(listener));
+public Set voterNodes(Stream voterIds, ListenerName 
listenerName) {

Review Comment:
   This seems like the wrong behavior to me for the case where one of the 
voters doesn't have a specific listener, but the other ones do. Wouldn't we 
want to continue to use the other, working voters in that case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15305:
-

Assignee: (was: Kirk True)

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-29 Thread via GitHub


soarez commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1619486214


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -73,6 +73,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_TOPIC_CONFIG_PREFIX;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;

Review Comment:
   This constant is declared in MirrorConnectorConfig, so the static import to 
refer to that class directly



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##
@@ -50,6 +50,40 @@ public class MirrorCheckpointConnectorTest {
 private static final String CONSUMER_GROUP = "consumer-group-1";
 private static final Map SOURCE_OFFSET = 
MirrorUtils.wrapOffset(0);
 
+@Test
+public void testEmitCheckpointsAndSyncGroupOffsetsBothDisabled() {
+// disable the checkpoint emission
+MirrorCheckpointConfig config = new MirrorCheckpointConfig(
+makeProps("emit.checkpoints.enabled", "false",
+"sync.group.offsets.enabled", "false"));
+
+Set knownConsumerGroups = new HashSet<>();
+knownConsumerGroups.add(CONSUMER_GROUP);
+// MirrorCheckpointConnector as minimum to run taskConfig()
+MirrorCheckpointConnector connector = new 
MirrorCheckpointConnector(knownConsumerGroups,
+config);
+List> output = connector.taskConfigs(1);
+// expect no task will be created
+assertEquals(0, output.size(), "MirrorCheckpointConnector not 
disabled");
+}
+
+@Test
+public void testEmitOffsetSyncsDisabled() {
+// disable the checkpoint emission
+MirrorCheckpointConfig config = new MirrorCheckpointConfig(
+makeProps("emit.checkpoints.enabled", "false",
+MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, 
"false"));
+
+Set knownConsumerGroups = new HashSet<>();
+knownConsumerGroups.add(CONSUMER_GROUP);
+// MirrorCheckpointConnector as minimum to run taskConfig()
+MirrorCheckpointConnector connector = new 
MirrorCheckpointConnector(knownConsumerGroups,
+config);
+List> output = connector.taskConfigs(1);
+// expect no task will be created
+assertEquals(0, output.size(), "MirrorCheckpointConnector not 
disabled");

Review Comment:
   Should this be extracted into a common method? e.g. 
`assertMirrorCheckpointConnectorDisabled(config)`



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##
@@ -437,6 +437,55 @@ public void testSendSyncEvent() {
 verify(producer, times(5)).send(any(), any());
 }
 
+@Test
+public void testDisableEmitOffsetSync() {
+byte[] recordKey = "key".getBytes();
+byte[] recordValue = "value".getBytes();
+int maxOffsetLag = 50;
+int recordPartition = 0;
+int recordOffset = 0;
+int metadataOffset = 100;
+String topicName = "topic";
+String sourceClusterName = "sourceCluster";
+
+RecordHeaders headers = new RecordHeaders();
+ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+
+@SuppressWarnings("unchecked")
+KafkaConsumer consumer = mock(KafkaConsumer.class);
+@SuppressWarnings("unchecked")
+KafkaProducer producer = mock(KafkaProducer.class);
+MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+Semaphore outstandingOffsetSyncs = new Semaphore(1);
+PartitionState partitionState = new PartitionState(maxOffsetLag);
+Map partitionStates = new HashMap<>();
+
+MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
+replicationPolicy, maxOffsetLag, producer, 
outstandingOffsetSyncs, partitionStates, topicName, false);
+
+SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
+TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
+partitionStates.put(sourceTopicPartition, partitionState);
+RecordMetadata recordMetadata = new 
RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+
+ArgumentCaptor producerCallback = 
ArgumentCaptor.forClass(Callback.class);
+when(producer.send(any(), 
producerCallback.capture())).thenAnswer(mockInvocation -> {
+producerCallback.getValue().onCompletion(null, 

Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619518236


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -17,108 +17,296 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
+/**
+ * The request manager keeps tracks of the connection with remote replicas.
+ *
+ * When sending a request update this type by calling {@code 
onRequestSent(Node, long, long)}. When
+ * the RPC returns a response, update this manager with {@code 
onResponseResult(Node, long, boolean, long)}.
+ *
+ * Connections start in the ready state ({@code isReady(Node, long)} returns 
true).
+ *
+ * When a request times out or completes successfully the collection will 
transition back to the
+ * ready state.
+ *
+ * When a request completes with an error it still transition to the backoff 
state until
+ * {@code retryBackoffMs}.
+ */
 public class RequestManager {
-private final Map connections = new HashMap<>();
-private final List voters = new ArrayList<>();
+private final Map connections = new HashMap<>();

Review Comment:
   I don't understand why you are changing this to treat node ID as a string. 
An Int seems better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-05-29 Thread via GitHub


chia7712 commented on PR #16116:
URL: https://github.com/apache/kafka/pull/16116#issuecomment-2138325276

   @OmniaGM Could you please fix the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Delete KafkaSecurityConfigs class [kafka]

2024-05-29 Thread via GitHub


chia7712 merged PR #16113:
URL: https://github.com/apache/kafka/pull/16113


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16864) Copy on write in the Optimized Uniform Assignor

2024-05-29 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16864:


 Summary: Copy on write in the Optimized Uniform Assignor
 Key: KAFKA-16864
 URL: https://issues.apache.org/jira/browse/KAFKA-16864
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: David Jacot


An optimization for the uniform (homogenous) assignor by avoiding creating a 
copy of all the assignments. Instead, the assignor creates a copy only if the 
assignment is updated. It is a sort of copy-on-write. This change reduces the 
overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) 
assignor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619514531


##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -199,6 +206,34 @@ private static Map 
parseVoterConnections(
 return voterMap;
 }
 
+public static InetSocketAddress parseBootstrapServer(String 
bootstrapServer) {
+String host = Utils.getHost(bootstrapServer);
+if (host == null) {
+throw new ConfigException(
+String.format(
+"Failed to parse host name from {} for the configuration 
{}. Each " +
+"entry should be in the form \"{host}:{port}\"",
+bootstrapServer,
+QUORUM_BOOTSTRAP_SERVERS_CONFIG
+)
+);
+}
+
+Integer port = Utils.getPort(bootstrapServer);
+if (port == null) {
+throw new ConfigException(
+String.format(
+"Failed to parse host port from {} for the configuration 
{}. Each " +
+"entry should be in the form \"{host}:{port}\"",
+bootstrapServer,
+QUORUM_BOOTSTRAP_SERVERS_CONFIG
+)
+);
+}
+
+return InetSocketAddress.createUnresolved(host, port);

Review Comment:
   I think it's a mistake to resolve hostnames as part of configuration 
validation. Just treat them as strings.
   
   Otherwise you could end up not being able to start Kafka because 1 out of 3 
hostnames in your bootstrap list could not be resolved (because of a DNS 
problem or whatever)
   
   Resolve a hostname when you're actually using it, not before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-29 Thread via GitHub


junrao commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2138319133

   @clolov Sorry to ping you again. The 3.8 branch is going to be cut by 
Friday. Will you still be able to complete this PR for 3.8.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619511561


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1635,24 +1693,29 @@ private Optional maybeHandleCommonResponse(
 }
 
 private void maybeTransition(
-OptionalInt leaderId,
+Optional leader,
 int epoch,
 long currentTimeMs
 ) {
+OptionalInt leaderId = OptionalInt.empty();

Review Comment:
   Again would be more clearly expressed with `?:` rather than mutating the 
OptionalInt reference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619511354


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *  Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {

Review Comment:
   I had the same question, we might need it in the rack aware case for all the 
common methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619510911


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1593,6 +1644,13 @@ private Optional maybeHandleCommonResponse(
 int epoch,
 long currentTimeMs
 ) {
+Optional leader = Optional.empty();

Review Comment:
   This would be more clearly expressed as something like 
   ```
   Optional leader = leaderId.isPresent() ? 
 partitionState.lastVoterSet().voterNode(leaderId.getAsInt(), 
channel.listenerName()) : 
 Optional.empty();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] : Code Cleanup - Misc modules [kafka]

2024-05-29 Thread via GitHub


chia7712 commented on code in PR #16067:
URL: https://github.com/apache/kafka/pull/16067#discussion_r1619506272


##
core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java:
##
@@ -105,12 +106,12 @@ public KafkaPrincipal deserialize(byte[] bytes) throws 
SerializationException {
 UpdateMetadataBroker broker = new UpdateMetadataBroker()
 .setId(0)
 .setRack("rack")
-.setEndpoints(Arrays.asList(
-new UpdateMetadataRequestData.UpdateMetadataEndpoint()
-.setHost("broker0")
-.setPort(9092)
-.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-.setListener(plaintextListener.value())
+.setEndpoints(Collections.singletonList(
+new UpdateMetadataRequestData.UpdateMetadataEndpoint()

Review Comment:
   Could you please revert those unrelated changes to reduce the size of PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619507872


##
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##
@@ -156,22 +156,24 @@ public boolean canGrantVote(ReplicaKey candidateKey, 
boolean isLogUpToDate) {
 log.debug(
 "Rejecting vote request from candidate ({}) since we already have 
a leader {} in epoch {}",
 candidateKey,
-leaderId(),
+leader,
 epoch
 );
 return false;
 }
 
 @Override
 public String toString() {
-return "FollowerState(" +
-"fetchTimeoutMs=" + fetchTimeoutMs +
-", epoch=" + epoch +
-", leaderId=" + leaderId +
-", voters=" + voters +
-", highWatermark=" + highWatermark +
-", fetchingSnapshot=" + fetchingSnapshot +
-')';
+return String.format(

Review Comment:
   I'm not sure I see a lot of benefit to this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506788


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}

Review Comment:
   Do this in a separate cleanup PR.



##
core/src/test/resources/log4j.properties:
##
@@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m 
(%c:%L)%n
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
 
-

Review Comment:
   Do this in a separate cleanup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-29 Thread via GitHub


cmccabe commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1619506439


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -94,6 +95,7 @@ class SharedServer(
   val time: Time,
   private val _metrics: Metrics,
   val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, 
InetSocketAddress]],
+  val bootstrapServers: JCollection[InetSocketAddress],

Review Comment:
   I don't see why this needs to be a new argument. We already have KafkaConfig 
passed in as an argument, and this just comes from 
`QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers)`.
   
   Adding a new argument here creates a lot of churn, which is a lot of work 
for merges and such.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-29 Thread via GitHub


OmniaGM commented on PR #15999:
URL: https://github.com/apache/kafka/pull/15999#issuecomment-2138308937

   I was hoping for it to land in 3.8.0 I'll raise a PR against 3.8 once we 
merge it into trunk. however It shouldn't be a blocker for 3.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-29 Thread via GitHub


chia7712 commented on PR #15830:
URL: https://github.com/apache/kafka/pull/15830#issuecomment-2138307998

   @pasharik could you please fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850505#comment-17850505
 ] 

Chia-Ping Tsai edited comment on KAFKA-15305 at 5/29/24 9:37 PM:
-

KAFKA-16639 needs this ticket to fix root cause. As this ticket described, 
`ConsumerNetworkThread` does not honor the close timeout. Even though we put a 
heartbeat request to leave group, `ConsumerNetworkThread` will move it to 
`NetworkClient` and then exit the waiting ...

It seems to me the simple solution is to add a method "hasInFlightRequests" to 
"networkClientDelegate", and then we change the while condition [0] from 
"timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to 
"timer.notExpired() && networkClientDelegate.hasInFlightRequests()".

{code:java}
boolean hasInFlightRequests() {
return client.hasInFlightRequests();
}
{code}

{code:java}
do {
networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && 
networkClientDelegate.hasInFlightRequests());
{code}

[~kirktrue] WDYT? 

[0] 
https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L301


was (Author: chia7712):
KAFKA-16639 needs this ticket to fix root cause. As this ticket described, 
`ConsumerNetworkThread` does not honor the close timeout. Even though we put a 
heartbeat request to leave group, `ConsumerNetworkThread` will move it to 
`NetworkClient` and then exit the waiting ...

It seems to me the simple solution is to add a method "hasInFlightRequests" to 
"networkClientDelegate", and then we change the while condition from 
"timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to 
"timer.notExpired() && networkClientDelegate.hasInFlightRequests()".

{code:java}
boolean hasInFlightRequests() {
return client.hasInFlightRequests();
}
{code}

{code:java}
do {
networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && 
networkClientDelegate.hasInFlightRequests());
{code}

[~kirktrue] WDYT? 



> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850506#comment-17850506
 ] 

Chia-Ping Tsai commented on KAFKA-16639:


KAFKA-15305 is the necessary cure for this issue. Otherwise, the heartbeat 
request may not be sent in closing. 

In this ticket we focus on fixing the "uncreated heartbeat request"

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850505#comment-17850505
 ] 

Chia-Ping Tsai commented on KAFKA-15305:


KAFKA-16639 needs this ticket to fix root cause. As this ticket described, 
`ConsumerNetworkThread` does not honor the close timeout. Even though we put a 
heartbeat request to leave group, `ConsumerNetworkThread` will move it to 
`NetworkClient` and then exit the waiting ...

It seems to me the simple solution is to add a method "hasInFlightRequests" to 
"networkClientDelegate", and then we change the while condition from 
"timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to 
"timer.notExpired() && networkClientDelegate.hasInFlightRequests()".

{code:java}
boolean hasInFlightRequests() {
return client.hasInFlightRequests();
}
{code}

{code:java}
do {
networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && 
networkClientDelegate.hasInFlightRequests());
{code}

[~kirktrue] WDYT? 



> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16529) Response handling and request sending for voters RPCs

2024-05-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16529:
---
Description: 
Implement response handling and request sending for the following RPCs:
 # Vote
 # BeginQuorumEpoch
 # EndQuorumEpoch
 # Fetch
 # FetchSnapshot

When loading the leader from the quorum-state file don't assume that the leader 
is in the voter set.

  was:
Implement response handling and request sending for the following RPCs:
 # Vote
 # BeginQuorumEpoch
 # Fetch

When loading the leader from the quorum-state file don't assume that the leader 
is in the voter set.


> Response handling and request sending for voters RPCs
> -
>
> Key: KAFKA-16529
> URL: https://issues.apache.org/jira/browse/KAFKA-16529
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> Implement response handling and request sending for the following RPCs:
>  # Vote
>  # BeginQuorumEpoch
>  # EndQuorumEpoch
>  # Fetch
>  # FetchSnapshot
> When loading the leader from the quorum-state file don't assume that the 
> leader is in the voter set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-29 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1619478117


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -26,9 +26,9 @@
  */
 public interface GroupSpec {
 /**
- * @return Member metadata keyed by member Id.
+ * @return Member subscription metadata keyed by member Id.
  */
-Map members();
+Map memberSubscriptions();

Review Comment:
   discussed offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-29 Thread via GitHub


chia7712 commented on PR #15999:
URL: https://github.com/apache/kafka/pull/15999#issuecomment-2138240570

   just remind that today is the deadline for feature freeze. If this PR is 
necessary for 3.8.0, we need to cherry-pick to branch-3.8 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]

2024-05-29 Thread via GitHub


apourchet commented on code in PR #16123:
URL: https://github.com/apache/kafka/pull/16123#discussion_r1619452846


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster 
fullMetadata,
 
 final 
Optional 
userTaskAssignor =
 userTaskAssignorSupplier.get();
+UserTaskAssignmentListener userTaskAssignmentListener = 
(GroupAssignment assignment, GroupSubscription subscription) -> { };

Review Comment:
   You can't change it once you make it final.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-05-29 Thread via GitHub


loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##
@@ -288,7 +301,31 @@ private  void forwardInternal(final ProcessorNode child,
 final Record record) {
 setCurrentNode(child);
 
-child.process(record);
+try {

Review Comment:
   @cadonna Catching exceptions right here: 
https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847
 was our very first intention before we decided to move it to 
`ProcessorContextImpl#forwardInternal`.
   
   Unless we're wrong, we cannot get the precise node name where the exception 
occurred at `StreamTask#doProcess` level.
   
   Could we:
   - catch exceptions at `StreamTask#doProcess` level
   - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but 
rather than rethrowing a `StreamsException` 
(https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320),
 it can throw a `new ChildNodeProcessingException(child.name, e)` caught and 
handled at StreamTask#doProcess level. 
   
   `ChildNodeProcessingException` would be a new exception acting as a wrapper 
of the root cause, that additionally provides the name of the child node in 
which an exception occurs... letting the first node know about something 
happened in one of its children.
   
   Hope it is clear, if it sounds like a plan to you, we can give a try to this 
impl
   
   _EDIT_:
   
   We might have to call `processingExceptionHandler#handle` in 
`ProcessorContextImpl#forwardInternal` to know if processing should FAIL or 
CONTINUE at this level. Otherwise, a `ChildNodeProcessingException` will be 
thrown in all cases, FAIL or CONTINUE will be computed in 
`StreamTask#doProcess`, but in case of CONTINUE, how to resume the processing 
(maybe possible if ChildNodeProcessingException provides the failed child node 
name, and the current state of the record)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]

2024-05-29 Thread via GitHub


chia7712 commented on PR #16085:
URL: https://github.com/apache/kafka/pull/16085#issuecomment-2138227399

   @brandboat Could you please check the build error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-29 Thread via GitHub


junrao commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1619445831


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be copied per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = 
"remote.log.manager.copy.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote copy quota management. " +
+"The default value is 61, which means there are 60 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 
61;
+
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = 
"remote.log.manager.copy.quota.window.size.seconds";
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each 
sample for remote copy quota management. " +
+"The default value is 1 second.";
+public static final int 
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.fetch.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of 
bytes that can be fetched from remote storage to local storage per second. " +
+"This is a global limit for all the partitions that are being 
fetched from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be fetched per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP 
= "remote.log.manager.fetch.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote fetch quota management. " 
+
+"The default value is 11, which means there are 10 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM 
= 11;

Review Comment:
   If there is no good reach, perhaps it's better to use the same default 
window number for copy too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-29 Thread via GitHub


jeffkbkim commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619407866


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -53,8 +42,17 @@
  * The assignment builder prioritizes the properties in the following order:
  *  Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {

Review Comment:
   are we going to remove AbstractUniformAssignmentBuilder?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##
@@ -158,12 +159,11 @@ public void 
testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() {
 
 Map>> expectedAssignment = new 
HashMap<>();
 expectedAssignment.put(memberA, mkAssignment(
-mkTopicAssignment(topic1Uuid, 0, 2),
-mkTopicAssignment(topic3Uuid, 1)
+mkTopicAssignment(topic1Uuid, 0),
+mkTopicAssignment(topic3Uuid, 0, 1)
 ));
 expectedAssignment.put(memberB, mkAssignment(
-mkTopicAssignment(topic1Uuid, 1),
-mkTopicAssignment(topic3Uuid, 0)
+mkTopicAssignment(topic1Uuid, 1, 2)

Review Comment:
   I see some test cases where we have changed the expected assignment. why did 
it change?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int 

Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]

2024-05-29 Thread via GitHub


ableegoldman merged PR #16123:
URL: https://github.com/apache/kafka/pull/16123


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on PR #16123:
URL: https://github.com/apache/kafka/pull/16123#issuecomment-2138189584

   Test failures are unrelated, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-29 Thread via GitHub


junrao merged PR #15889:
URL: https://github.com/apache/kafka/pull/15889


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmenComputed [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #16123:
URL: https://github.com/apache/kafka/pull/16123#discussion_r1619409187


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -720,7 +727,7 @@ private void checkAllPartitions(final Set 
allSourceTopics,
  * Assigns a set of tasks to each client (Streams instance) using the 
configured task assignor, and also
  * populate the stateful tasks that have been assigned to the clients
  */
-private void assignTasksToClients(final Cluster fullMetadata,
+private UserTaskAssignmentListener assignTasksToClients(final Cluster 
fullMetadata,

Review Comment:
   nit: fix formatting (indentation of the lower parameters)
   
   Obviously you can address this in PR 9, no need to rerun the build on this PR



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster 
fullMetadata,
 
 final 
Optional 
userTaskAssignor =
 userTaskAssignorSupplier.get();
+UserTaskAssignmentListener userTaskAssignmentListener = 
(GroupAssignment assignment, GroupSubscription subscription) -> { };
 if (userTaskAssignor.isPresent()) {
 final ApplicationState applicationState = buildApplicationState(
 taskManager.topologyMetadata(),
 clientMetadataMap,
 topicGroups,
 fullMetadata
 );
-final TaskAssignment taskAssignment = 
userTaskAssignor.get().assign(applicationState);
+final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = userTaskAssignor.get();
+final TaskAssignment taskAssignment = 
assignor.assign(applicationState);
 processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
+final AssignmentError assignmentError = 
validateTaskAssignment(applicationState, taskAssignment);
+userTaskAssignmentListener = (GroupAssignment assignment, 
GroupSubscription subscription) -> {
+assignor.onAssignmentComputed(assignment, subscription, 
assignmentError);
+};

Review Comment:
   nit: Streams coding style tip, use simplified lambdas, ie
   
   ```suggestion
   userTaskAssignmentListener = (assignment, subscription) -> 
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
   ```
   
   (I'm actually surprised checkstyle isn't complaining about this. Your IDE is 
probably at least telling you to simplify it I would guess)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -768,15 +775,21 @@ private void assignTasksToClients(final Cluster 
fullMetadata,
 
 final 
Optional 
userTaskAssignor =
 userTaskAssignorSupplier.get();
+UserTaskAssignmentListener userTaskAssignmentListener = 
(GroupAssignment assignment, GroupSubscription subscription) -> { };

Review Comment:
   nit (address in PR 9): Streams coding style tip, prefer final variables 
whenever possible. So this would be like:
   ```
   final UserTaskAssignmentListener listener;
   if (userTaskAssignor.isPresent() {
   // do stuff
   listener = ...;
   else {
  // do other stuff
  listener = (a, s) -> { };
   }
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultApplicationState.java:
##
@@ -32,22 +33,29 @@
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 
-public class ApplicationStateImpl implements ApplicationState {
+public class DefaultApplicationState implements ApplicationState {
 
 private final AssignmentConfigs assignmentConfigs;
-private final Set tasks;
+private final Map tasks;
 private final Map clientStates;
 
-public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
-final Set tasks,
-final Map clientStates) {
+private final Map> 
cachedKafkaStreamStates;
+
+public DefaultApplicationState(final AssignmentConfigs assignmentConfigs,
+   final Set tasks,
+   final Map 
clientStates) {
 this.assignmentConfigs = assignmentConfigs;
-this.tasks = unmodifiableSet(tasks);
+this.tasks = 
unmodifiableMap(tasks.stream().collect(Collectors.toMap(TaskInfo::id, task -> 
task)));

Review Comment:
   We can avoid adding yet another `.stream().collect() ` by building up the 
map with the TaskIds as keys when we're building the `Set tasks` 
parameter in the caller of this constructor. 



##

Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-05-29 Thread via GitHub


loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##
@@ -288,7 +301,31 @@ private  void forwardInternal(final ProcessorNode child,
 final Record record) {
 setCurrentNode(child);
 
-child.process(record);
+try {

Review Comment:
   @cadonna Catching exceptions right here: 
https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847
 was our very first intention before we decided to move it to 
`ProcessorContextImpl#forwardInternal`.
   
   Unless we're wrong, we cannot get the precise node name where the exception 
occurred at `StreamTask#doProcess` level.
   
   Could we:
   - catch exceptions at `StreamTask#doProcess` level
   - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but 
rather than rethrowing a `StreamsException` 
(https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320),
 it can throw a `new ChildNodeProcessingException(child.name, e)` caught and 
handled at StreamTask#doProcess level. 
   
   `ChildNodeProcessingException` would be a new exception acting as a wrapper 
of the root cause, that additionally provides the name of the child node in 
which an exception occurs... letting the first node know about something 
happened in one of its children.
   
   Hope it is clear, if it sounds like a plan to you, we can give a try to this 
impl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-05-29 Thread via GitHub


loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##
@@ -288,7 +301,31 @@ private  void forwardInternal(final ProcessorNode child,
 final Record record) {
 setCurrentNode(child);
 
-child.process(record);
+try {

Review Comment:
   @cadonna Catching exceptions right here: 
https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847
 was our very first intention before we decided to move it to 
`ProcessorContextImpl#forwardInternal`.
   
   Unless we're wrong, we cannot get the precise node name where the exception 
occured at `StreamTask#doProcess` level.
   
   Could we:
   - catch exceptions at `StreamTask#doProcess` level
   - catch exceptions at `ProcessorContextImpl#forwardInternal` level, but 
rather than rethrowing a `StreamsException` 
(https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320),
 it can throw a `new ChildNodeProcessingException(child.name, e)` caught and 
handled at StreamTask#doProcess level. 
   
   `ChildNodeProcessingException` would be a new exception acting as a wrapper 
of the root cause, that additionally provides the name of the child node in 
which an exception occurs... letting the first node know about something 
happened in one of its children.
   
   Hope it is clear, if it sounds like a plan to you, we can give a try to this 
impl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16105: Reassignment fix [kafka]

2024-05-29 Thread via GitHub


AnatolyPopov commented on code in PR #15165:
URL: https://github.com/apache/kafka/pull/15165#discussion_r1619366626


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -254,6 +259,61 @@ public void testCanProcessRecord() throws 
InterruptedException {
 assertEquals(3, handler.metadataCounter);
 }
 
+@Test
+public void testCanReprocessSkippedRecords() throws InterruptedException {
+final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 1));
+final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 3));
+assertEquals(partitioner.metadataPartition(tpId0), 
partitioner.metadataPartition(tpId1));
+assertNotEquals(partitioner.metadataPartition(tpId3), 
partitioner.metadataPartition(tpId0));
+
+final int metadataPartition = partitioner.metadataPartition(tpId0);
+final int anotherMetadataPartition = 
partitioner.metadataPartition(tpId3);
+
+// Mocking the consumer to be able to wait for the second reassignment
+doAnswer(invocation -> {

Review Comment:
   Totally agree about the mock-of-a-mock approach, I don't like it either. But 
hopefully it's good enough for now and it helps to focus on changes related 
specifically to this bug in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-29 Thread via GitHub


emitskevich-blp commented on PR #15889:
URL: https://github.com/apache/kafka/pull/15889#issuecomment-2138093800

   > Are the failed tests related to this PR, especially the following one?
   
   Not related, they are all flaky tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619346921


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   ```
   val defaultValue = defaultVersionString match {
 case Some(versionString) => 
MetadataVersion.fromVersionString(versionString)
 case None => MetadataVersion.LATEST_PRODUCTION
   }
   
   val releaseVersionTag = Option(namespace.getString("release_version"))
   val featureTag = 
featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)
   
   (releaseVersionTag, featureTag) match {
 case (Some(_), Some(_)) => // We should throw an error before we hit 
this case, but include for completeness
   throw new IllegalArgumentException("Both --release_version and 
--feature were set. Only one of the two flags can be set.")
 case (Some(version), None) =>
   MetadataVersion.fromVersionString(version)
 case (None, Some(level)) =>
   MetadataVersion.fromFeatureLevel(level)
 case (None, None) =>
   defaultValue
   }
   ```



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   We only use the passed in metadata version for defaults if --release-version 
is specified. If version default is specified, we don't use the replication 
configs.



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+   

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   We only use the passed in metadata version for defaults if --version-default 
is specified. If version default is specified, we don't use the replication 
configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: make public the consumer group migration policy config [kafka]

2024-05-29 Thread via GitHub


dongnuo123 opened a new pull request, #16128:
URL: https://github.com/apache/kafka/pull/16128

   This patch expose the group coordinator config 
`CONSUMER_GROUP_MIGRATION_POLICY_CONFIG`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619332926


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   Thanks for the reply. Got it now. We pass in 
INTER_BROKER_PROTOCOL_VERSION_CONFIG as the default when calling 
`getMetadataVersion`. But that config shouldn't impact the MV used for 
selecting other features. 
   
   ```
 val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
   
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-05-29 Thread via GitHub


loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619321278


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -764,6 +766,7 @@ public boolean process(final long wallClockTime) {
 
 // get the next record to process
 record = partitionGroup.nextRecord(recordInfo, wallClockTime);
+rawRecord = partitionGroup.rawHeadRecord();

Review Comment:
   We did not really consider it as it appears more impactful for us to add a 
new attribute to `ProcessorRecordContext` (more code/unit tests changes 
basically) but we can definitely reconsider it.
   
   Also we'll need to update the production exception handling 
(https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L223)
 accordingly to KIP-1033. Having the raw record in the `ProcessorRecordContext` 
would help us on this part. Let us try that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-05-29 Thread via GitHub


loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619321278


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -764,6 +766,7 @@ public boolean process(final long wallClockTime) {
 
 // get the next record to process
 record = partitionGroup.nextRecord(recordInfo, wallClockTime);
+rawRecord = partitionGroup.rawHeadRecord();

Review Comment:
   We did not really consider it as it appears more impactful for us to add a 
new attribute to `ProcessorRecordContext` (more code/unit tests changes 
basically) but we can definitely reconsider it.
   
   Also we'll need to update the production exception handling 
(https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L223)
 accordingly to KIP-1033. Having the raw record in the `ProcessorRecordContext` 
would help us on this part. Let us try that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-29 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1619318104


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   It's a corner case I would say -- also, we do wrap a log of exception as 
StreamsException throughout the whole code base, and given that we wrap the 
original `ConfigException` inside the StreamsException we would not lose 
information. It's just the idea to simplify the code a little bit.
   
   Let's hear what @cadonna or @ableegoldman think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [Draft]Topic command integration test migrate to new test infra [kafka]

2024-05-29 Thread via GitHub


TaiJuWu opened a new pull request, #16127:
URL: https://github.com/apache/kafka/pull/16127

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set [kafka]

2024-05-29 Thread via GitHub


ahuang98 commented on code in PR #16079:
URL: https://github.com/apache/kafka/pull/16079#discussion_r1619298801


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -445,6 +463,27 @@ private boolean isVoter(int remoteNodeId) {
 return voterStates.containsKey(remoteNodeId);
 }
 
+// with Jose's changes this will probably make more sense as VoterSet
+private void updateVoterSet(Set lastVoterSet) {

Review Comment:
   the comment is basically saying `Set` will likely change to 
`VoterSet`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-05-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16863:
---

 Summary: Consider removing `default.` prefix for exception handler 
config
 Key: KAFKA-16863
 URL: https://issues.apache.org/jira/browse/KAFKA-16863
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams has a set of configs with `default.` prefix. The intent for the 
default-prefix is to make a distinction between, well the default, and in-place 
overwrites in the code. Eg, users can specify ts-extractors on a per-topic 
basis.

However, for the deserialization- and production-exception handlers, no such 
overwrites are possible, and thus, `default.` does not really make sense, 
because there is just one handler overall. Via KIP-1033 we added a new 
processing-exception handler w/o a default-prefix, too.

Thus, we should consider to deprecate the two existing configs names and add 
them back w/o the `default.` prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set [kafka]

2024-05-29 Thread via GitHub


ahuang98 commented on code in PR #16079:
URL: https://github.com/apache/kafka/pull/16079#discussion_r1619301291


##
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##
@@ -103,189 +105,247 @@ public void testNonFollowerAcknowledgement() {
 assertThrows(IllegalArgumentException.class, () -> 
state.addAcknowledgementFrom(nonVoterId));
 }
 
-@Test
-public void testUpdateHighWatermarkQuorumSizeOne() {
-LeaderState state = newLeaderState(singleton(localId), 15L);
-assertEquals(Optional.empty(), state.highWatermark());
-assertFalse(state.updateLocalState(new LogOffsetMetadata(15L)));
-assertEquals(emptySet(), state.nonAcknowledgingVoters());
-assertEquals(Optional.empty(), state.highWatermark());
-assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
-assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
-assertTrue(state.updateLocalState(new LogOffsetMetadata(20)));
-assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
-}
+//@Test
+//public void testUpdateHighWatermarkQuorumSizeOne() {
+//LeaderState state = newLeaderState(singleton(localId), 15L);
+//assertEquals(Optional.empty(), state.highWatermark());
+//assertFalse(state.updateLocalState(new LogOffsetMetadata(15L)));
+//assertEquals(emptySet(), state.nonAcknowledgingVoters());
+//assertEquals(Optional.empty(), state.highWatermark());
+//assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
+//assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
+//assertTrue(state.updateLocalState(new LogOffsetMetadata(20)));
+//assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
+//}
+//
+//@Test
+//public void testNonMonotonicLocalEndOffsetUpdate() {
+//LeaderState state = newLeaderState(singleton(localId), 15L);
+//assertEquals(Optional.empty(), state.highWatermark());
+//assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
+//assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
+//assertThrows(IllegalStateException.class,
+//() -> state.updateLocalState(new LogOffsetMetadata(15L)));
+//}
+//
+//@Test
+//public void testLastCaughtUpTimeVoters() {
+//int node1 = 1;
+//int node2 = 2;
+//int currentTime = 1000;
+//int fetchTime = 0;
+//int caughtUpTime = -1;
+//LeaderState state = newLeaderState(mkSet(localId, node1, node2), 
10L);
+//assertEquals(Optional.empty(), state.highWatermark());
+//assertFalse(state.updateLocalState(new LogOffsetMetadata(10L)));
+//assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
+//assertEquals(Optional.empty(), state.highWatermark());
+//
+//// Node 1 falls behind
+//assertFalse(state.updateLocalState(new LogOffsetMetadata(11L)));
+//assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L)));
+//assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//// Node 1 catches up to leader
+//assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L)));
+//caughtUpTime = fetchTime;
+//assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//// Node 1 falls behind
+//assertFalse(state.updateLocalState(new LogOffsetMetadata(100L)));
+//assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L)));
+//assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//// Node 1 catches up to the last fetch offset
+//int prevFetchTime = fetchTime;
+//assertFalse(state.updateLocalState(new LogOffsetMetadata(200L)));
+//assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(100L)));
+//caughtUpTime = prevFetchTime;
+//assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
+//assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
+//
+//// Node2 has never caught up to leader
+//assertEquals(-1L, describeVoterState(state, node2, 
currentTime).lastCaughtUpTimestamp());
+//

  1   2   3   4   >