Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on PR #16034:
URL: https://github.com/apache/kafka/pull/16034#issuecomment-2127905750

   Ah shoot, I was looking at the wrong test results, this is actually breaking 
RackAwareTaskAssignorTest. I'll revert this and we can fix the failing test in 
the resubmitted PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-23 Thread via GitHub


ableegoldman merged PR #16034:
URL: https://github.com/apache/kafka/pull/16034


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-22 Thread via GitHub


apourchet commented on code in PR #16034:
URL: https://github.com/apache/kafka/pull/16034#discussion_r1610816330


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -513,50 +515,45 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set sourceTopicPartitions = new HashSet<>();
-final Set nonSourceChangelogTopicPartitions = new 
HashSet<>();
-for (final Map.Entry> entry : 
sourcePartitionsForTask.entrySet()) {
-final TaskId taskId = entry.getKey();
-final Set taskSourcePartitions = entry.getValue();
-final Set taskChangelogPartitions = 
changelogPartitionsForTask.get(taskId);
-final Set taskNonSourceChangelogPartitions = new 
HashSet<>(taskChangelogPartitions);
-taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
-
-sourceTopicPartitions.addAll(taskSourcePartitions);
-
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
-}
+final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
+final Set allTopicPartitions = new 
HashSet<>();
+final Map> topicPartitionsForTask = 
new HashMap<>();
+logicalTaskIds.forEach(taskId -> {
+final Set topicPartitions = new HashSet<>();
+
+for (final TopicPartition topicPartition : 
sourcePartitionsForTask.get(taskId)) {
+final boolean isSource = true;
+final boolean isChangelog = 
changelogPartitionsForTask.get(taskId).contains(topicPartition);
+final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
+topicPartition, isSource, isChangelog, null);
+allTopicPartitions.add(racklessTopicPartition);
+topicPartitionsForTask.get(taskId).add(racklessTopicPartition);

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-22 Thread via GitHub


ableegoldman commented on code in PR #16034:
URL: https://github.com/apache/kafka/pull/16034#discussion_r1610769106


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##
@@ -38,26 +40,37 @@ public final class RackUtils {
 
 private RackUtils() { }
 
-public static Map> 
getRacksForTopicPartition(final Cluster cluster,
- 
final InternalTopicManager internalTopicManager,
- 
final Set topicPartitions,
- 
final boolean isChangelog) {
-final Set topicsToDescribe = new HashSet<>();
-if (isChangelog) {
-
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
-Collectors.toSet()));
-} else {
-topicsToDescribe.addAll(topicsWithMissingMetadata(cluster, 
topicPartitions));
-}
+public static void annotateWithTopicPartitionsWithRackInfo(final Cluster 
cluster,
+final 
InternalTopicManager internalTopicManager,

Review Comment:
   nit: fix the formatting/indentation (although this will change when you 
change the method name anyway)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##
@@ -38,26 +40,37 @@ public final class RackUtils {
 
 private RackUtils() { }
 
-public static Map> 
getRacksForTopicPartition(final Cluster cluster,
- 
final InternalTopicManager internalTopicManager,
- 
final Set topicPartitions,
- 
final boolean isChangelog) {
-final Set topicsToDescribe = new HashSet<>();
-if (isChangelog) {
-
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
-Collectors.toSet()));
-} else {
-topicsToDescribe.addAll(topicsWithMissingMetadata(cluster, 
topicPartitions));
-}
+public static void annotateWithTopicPartitionsWithRackInfo(final Cluster 
cluster,

Review Comment:
   this doesn't sound quite right, should it be 
`annotateTopicPartitionsWithRackInfo`? (ie extra "with" )



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -513,50 +515,45 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set sourceTopicPartitions = new HashSet<>();
-final Set nonSourceChangelogTopicPartitions = new 
HashSet<>();
-for (final Map.Entry> entry : 
sourcePartitionsForTask.entrySet()) {
-final TaskId taskId = entry.getKey();
-final Set taskSourcePartitions = entry.getValue();
-final Set taskChangelogPartitions = 
changelogPartitionsForTask.get(taskId);
-final Set taskNonSourceChangelogPartitions = new 
HashSet<>(taskChangelogPartitions);
-taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
-
-sourceTopicPartitions.addAll(taskSourcePartitions);
-
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
-}
+final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
+final Set allTopicPartitions = new 
HashSet<>();
+final Map> topicPartitionsForTask = 
new HashMap<>();
+logicalTaskIds.forEach(taskId -> {
+final Set topicPartitions = new HashSet<>();
+
+for (final TopicPartition topicPartition : 
sourcePartitionsForTask.get(taskId)) {
+final boolean isSource = true;
+final boolean isChangelog = 
changelogPartitionsForTask.get(taskId).contains(topicPartition);
+final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
+topicPartition, isSource, isChangelog, null);
+allTopicPartitions.add(racklessTopicPartition);
+topicPartitionsForTask.get(taskId).add(racklessTopicPartition);

Review Comment:
   both here and for the changelog loop below
   
   ```suggestion
   topicPartitions.add(racklessTopicPartition);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please 

[PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-22 Thread via GitHub


apourchet opened a new pull request, #16034:
URL: https://github.com/apache/kafka/pull/16034

   This PR uses the new TaskTopicPartition structure to simplify the build
   process for the ApplicationState, which is the input to the new
   TaskAssignor#assign call.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org