Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on PR #15972:
URL: https://github.com/apache/kafka/pull/15972#issuecomment-2127918475

   Yep, just noticed this. Sorry about that. We're taking a look
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-23 Thread via GitHub


dajac commented on PR #15972:
URL: https://github.com/apache/kafka/pull/15972#issuecomment-2126312436

   Hey @ableegoldman @apourchet, I see new failures in trunk that seems to be 
related to this PR. The last build of this PR had 100+ failures: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15972/8/tests.
 Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-22 Thread via GitHub


ableegoldman merged PR #15972:
URL: https://github.com/apache/kafka/pull/15972


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-20 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1607404895


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1974,6 +1974,15 @@ public Set 
nonSourceChangelogTopics() {
 return topicConfigs;
 }
 
+/**
+ *
+ * @return the set of changelog topics, which includes both source 
changelog topics and non
+ * source changelog topics.
+ */
+public Set changelogTopics() {
+return Collections.unmodifiableSet(new 
HashSet<>(stateChangelogTopics.keySet()));

Review Comment:
   I think you can skip the new HashSet step, that's pretty much redundant with 
the unmodifiableSet and since we don't plan on modifying the returned set, it's 
better to just wrap the keySet directly to save a bunch of unnecessary copying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-20 Thread via GitHub


apourchet commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1606917357


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+public static Map> 
getRacksForTopicPartition(final Cluster cluster,
+ 
final InternalTopicManager internalTopicManager,
+ 
final Set topicPartitions,
+ 
final boolean isChangelog) {
+final Set topicsToDescribe = new HashSet<>();
+if (isChangelog) {
+
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+Collectors.toSet()));
+} else {
+topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, 
topicPartitions));
+}
+
+final Set topicsWithUpToDateMetadata = 
topicPartitions.stream()
+.filter(partition -> !topicsToDescribe.contains(partition.topic()))
+.collect(Collectors.toSet());
+final Map> racksForTopicPartition = 
knownRacksForPartition(
+cluster, topicsWithUpToDateMetadata);
+
+final Map> freshTopicPartitionInfo =
+describeTopics(internalTopicManager, topicsToDescribe);
+freshTopicPartitionInfo.forEach((topic, partitionInfos) -> {
+partitionInfos.forEach(partitionInfo -> {
+final int partition = partitionInfo.partition();
+final TopicPartition topicPartition = new 
TopicPartition(topic, partition);
+final List replicas = partitionInfo.replicas();
+if (replicas == null || replicas.isEmpty()) {
+LOG.error("No replicas found for topic partition {}: {}", 
topic, partition);
+return;
+}
+
+final Set racks = 
replicas.stream().filter(Node::hasRack).map(Node::rack).collect(
+Collectors.toSet());
+racksForTopicPartition.computeIfAbsent(topicPartition, k -> 
new HashSet<>());
+racksForTopicPartition.get(topicPartition).addAll(racks);
+});
+});
+
+return racksForTopicPartition;
+}
+
+public static Set topicsWithStaleMetadata(final Cluster cluster, 
final Set topicPartitions) {
+final Set topicsWithStaleMetadata = new HashSet<>();
+for (final TopicPartition topicPartition : topicPartitions) {
+final PartitionInfo partitionInfo = 
cluster.partition(topicPartition);
+if (partitionInfo == null) {
+LOG.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+continue;

Review Comment:
   This isn't how the current rack aware code works:
   ```for (final TopicPartition topicPartition : topicPartitions) {
   final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
   if (partitionInfo == null) {
   log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
   return false;
   }
   final Node[] replica = partitionInfo.replicas();
   if (replica == null || replica.length == 

Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-20 Thread via GitHub


apourchet commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1606917357


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+public static Map> 
getRacksForTopicPartition(final Cluster cluster,
+ 
final InternalTopicManager internalTopicManager,
+ 
final Set topicPartitions,
+ 
final boolean isChangelog) {
+final Set topicsToDescribe = new HashSet<>();
+if (isChangelog) {
+
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+Collectors.toSet()));
+} else {
+topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, 
topicPartitions));
+}
+
+final Set topicsWithUpToDateMetadata = 
topicPartitions.stream()
+.filter(partition -> !topicsToDescribe.contains(partition.topic()))
+.collect(Collectors.toSet());
+final Map> racksForTopicPartition = 
knownRacksForPartition(
+cluster, topicsWithUpToDateMetadata);
+
+final Map> freshTopicPartitionInfo =
+describeTopics(internalTopicManager, topicsToDescribe);
+freshTopicPartitionInfo.forEach((topic, partitionInfos) -> {
+partitionInfos.forEach(partitionInfo -> {
+final int partition = partitionInfo.partition();
+final TopicPartition topicPartition = new 
TopicPartition(topic, partition);
+final List replicas = partitionInfo.replicas();
+if (replicas == null || replicas.isEmpty()) {
+LOG.error("No replicas found for topic partition {}: {}", 
topic, partition);
+return;
+}
+
+final Set racks = 
replicas.stream().filter(Node::hasRack).map(Node::rack).collect(
+Collectors.toSet());
+racksForTopicPartition.computeIfAbsent(topicPartition, k -> 
new HashSet<>());
+racksForTopicPartition.get(topicPartition).addAll(racks);
+});
+});
+
+return racksForTopicPartition;
+}
+
+public static Set topicsWithStaleMetadata(final Cluster cluster, 
final Set topicPartitions) {
+final Set topicsWithStaleMetadata = new HashSet<>();
+for (final TopicPartition topicPartition : topicPartitions) {
+final PartitionInfo partitionInfo = 
cluster.partition(topicPartition);
+if (partitionInfo == null) {
+LOG.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+continue;

Review Comment:
   This isn't how the current rack aware code works:
   ```for (final TopicPartition topicPartition : topicPartitions) {
   final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
   if (partitionInfo == null) {
   log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
   return false;
   }
   final Node[] replica = partitionInfo.replicas();
   if (replica == null || replica.length == 

Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-20 Thread via GitHub


apourchet commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1606913055


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+public static Map> 
getRacksForTopicPartition(final Cluster cluster,
+ 
final InternalTopicManager internalTopicManager,
+ 
final Set topicPartitions,
+ 
final boolean isChangelog) {
+final Set topicsToDescribe = new HashSet<>();
+if (isChangelog) {
+
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+Collectors.toSet()));
+} else {
+topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, 
topicPartitions));
+}
+
+final Set topicsWithUpToDateMetadata = 
topicPartitions.stream()
+.filter(partition -> !topicsToDescribe.contains(partition.topic()))
+.collect(Collectors.toSet());
+final Map> racksForTopicPartition = 
knownRacksForPartition(
+cluster, topicsWithUpToDateMetadata);
+
+final Map> freshTopicPartitionInfo =
+describeTopics(internalTopicManager, topicsToDescribe);

Review Comment:
   I agree, I wrote it this way to mimic the exact pattern of use that the 
RackAwareAssigner uses. Once this is all wired and tested though we can make 
optimizations and changes like this one (and the lazy rack info one).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605660608


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   Sorry for the wall of text  It might not seem like a huge deal but if it's 
an app with only source-changelog partitions, then doing this will save the 
assignor from having to make any DescribeTopics request since there are no 
non-source changelogs. 
   
   And yes, apps with only source changelogs do exist, they're pretty common 
for certain kinds of table-based processing (and especially apps that make 
heavy use of IQ). And saving a remote fetch is actually a pretty big deal, 
doing them in the middle of an assignment makes the rebalance vulnerable to 
timing out, especially when brokers are under heavy load or the app is 
experiencing rebalancing issues to begin with



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659907


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   To be more precise, I'm imagining something like this:
   
   ```
   final Set sourceTopicPartitions = new HashSet<>();
   final Set changelogTopicPartitions = new HashSet<>();
   final Set nonSourceChangelogTopicPartitions = new 
HashSet<>();
   
   for (final var entry : sourceTopicPartitions.entrySet()) {
   final TaskId task = entry.getKey();
   final Set taskSourcePartitions = entry.getValue();
   final Set taskChangelogPartitions = 
changelogTopicPartitions.get(taskId);
   final Set taskNonSourceChangelogPartitions = new 
HashSet(taskChangelogPartitions);
   taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
   
   logicalTaskIds.add(taskId);
   sourceTopicPartitions.addAll(taskSourcePartitions);
   changelogTopicPartitions.addAll(taskChangelogPartitions);
  
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
   }
   ```
   
   Then we pass the `nonSourceChangelogPartitions` into the 
`#getRacksForTopicPartition` instead of the `changelogPartitions` set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659120


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   Note that we'll also want to deduplicate the source-changelog partitions for 
the rack id computation. We should include them in the source topics/remove 
them from the changelog topics passed into the `#getRacksForTopicPartitions` 
call. Of course we still need the changelogTopicPartitions as well, so we'll 
want a third set of `nonSourceChangelogTopicPartitions` that's specifically for 
the rack id computation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605656530


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+changelogTopicPartitions.addAll(partitions);
+});
+
+final Map> racksForSourcePartitions = 
RackUtils.getRacksForTopicPartition(
+cluster, internalTopicManager, sourceTopicPartitions, false);
+final Map> racksForChangelogPartitions = 
RackUtils.getRacksForTopicPartition(

Review Comment:
   Since the rack info is nontrivial to compute and always makes a remote call 
(which can take a long time and even time out or otherwise fail) and not every 
assignor/app will actually use it I'm thinking maybe we should try to 
initialize it lazily, only if/when the user actually requests the rack info
   
   I'm totally happy to push that into a followup PR to keep the scope 
well-defined for now, so don't worry about it for now. We'd still need 
everything you implemented here and would just be moving it around and/or 
subbing in function pointers instead of passing around data strucutres 
directly, so it shouldn't have any impact on how this PR goes. Just wanted to 
make a note so I don't forget



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-17 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605639072


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
  *
  * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
  *  {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to 
all the stateful
- *  tasks that need to be reassigned.
  * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
  * assignments.
  */
-private ApplicationState buildApplicationState(final Map clientMetadataMap,
-   final Set 
statefulTasks) {
-final Set statelessTasks = new HashSet<>();
-for (final Map.Entry clientEntry : 
clientMetadataMap.entrySet()) {
-final ClientState clientState = clientEntry.getValue().state;
-statelessTasks.addAll(clientState.statelessActiveTasks());
+private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+   final Map clientMetadataMap,
+   final Map topicGroups,
+   final Cluster cluster) {
+final Map> sourceTopicsByGroup = new 
HashMap<>();
+final Map> changelogTopicsByGroup = new 
HashMap<>();
+for (final Map.Entry entry : 
topicGroups.entrySet()) {
+final Set sourceTopics = entry.getValue().sourceTopics;
+final Set changelogTopics = 
entry.getValue().stateChangelogTopics()
+.stream().map(t -> t.name).collect(Collectors.toSet());
+sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
 }
 
+final Map> sourcePartitionsForTask =
+partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+final Map> changelogPartitionsForTask =
+partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+final Set logicalTaskIds = new HashSet<>();
+final Set sourceTopicPartitions = new HashSet<>();
+sourcePartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);
+sourceTopicPartitions.addAll(partitions);
+});
+final Set changelogTopicPartitions = new HashSet<>();
+changelogPartitionsForTask.forEach((taskId, partitions) -> {
+logicalTaskIds.add(taskId);

Review Comment:
   I suppose this doesn't hurt anything since `logicalTasks` is a Set, but the 
taskIds returned by the partition grouper should be the same for the source and 
changelog topics. So you can remove this line
   
   (alternatively you can create the `logicalTaskIds` map up front by copying 
the keyset of one of the partitionsForTask maps but that's just an 
implementation detail, up to you. However I would probably consider adding a 
check to make sure these two maps return the same set of tasks. Doesn't need to 
scan the entire thing, maybe just a simple 
   
   ```
   if (sourcePartitionsForTask.size() != changelogPartitionsForTask.size()) {
 log.error("Partition grouper returned {} tasks for source topics but {} 
tasks for changelog topics, 
  sourcePartitionsForTask.size(), 
changelogPartitionsForTask.size());
 throw new TaskAssignmentException(//error msg );
   }



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple container class corresponding to a given {@link 

Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-16 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1604122125


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+private final TaskId id;
+private final boolean isStateful;
+private final Map> partitionToRackIds;
+private final Set stateStoreNames;
+private final Set inputTopicPartitions;
+private final Set changelogTopicPartitions;
+
+public DefaultTaskInfo(final TaskId id,
+   final boolean isStateful,
+   final Map> 
partitionToRackIds,
+   final Set stateStoreNames,
+   final Set inputTopicPartitions,
+   final Set changelogTopicPartitions) 
{
+this.id = id;
+this.partitionToRackIds = unmodifiableMap(partitionToRackIds);
+this.isStateful = isStateful;
+this.stateStoreNames = unmodifiableSet(stateStoreNames);
+this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions);
+this.changelogTopicPartitions = 
unmodifiableSet(changelogTopicPartitions);
+}
+
+public static DefaultTaskInfo of(final TaskId taskId,

Review Comment:
   since this is an internal API, you can just have a normal public 
constructor. The static constructor thing is only for public classes where we 
want to make a "nice looking" fluent API 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+private final TaskId id;
+private final boolean isStateful;
+private final Map> partitionToRackIds;
+private final Set stateStoreNames;
+private final Set inputTopicPartitions;
+private final Set changelogTopicPartitions;
+
+public DefaultTaskInfo(final TaskId id,
+   final boolean isStateful,
+   final Map> 
partitionToRackIds,
+   final Set stateStoreNames,
+   final Set inputTopicPartitions,
+   final Set 

[PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-15 Thread via GitHub


apourchet opened a new pull request, #15972:
URL: https://github.com/apache/kafka/pull/15972

   This rack information is required to compute rack-aware assignments, which 
many of the current assigners do.
   
   The internal ClientMetadata class was also edited to pass around this rack 
information.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org