Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on PR #15972: URL: https://github.com/apache/kafka/pull/15972#issuecomment-2127918475 Yep, just noticed this. Sorry about that. We're taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
dajac commented on PR #15972: URL: https://github.com/apache/kafka/pull/15972#issuecomment-2126312436 Hey @ableegoldman @apourchet, I see new failures in trunk that seems to be related to this PR. The last build of this PR had 100+ failures: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15972/8/tests. Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman merged PR #15972: URL: https://github.com/apache/kafka/pull/15972 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1607404895 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ## @@ -1974,6 +1974,15 @@ public Set nonSourceChangelogTopics() { return topicConfigs; } +/** + * + * @return the set of changelog topics, which includes both source changelog topics and non + * source changelog topics. + */ +public Set changelogTopics() { +return Collections.unmodifiableSet(new HashSet<>(stateChangelogTopics.keySet())); Review Comment: I think you can skip the new HashSet step, that's pretty much redundant with the unmodifiableSet and since we don't plan on modifying the returned set, it's better to just wrap the keySet directly to save a bunch of unnecessary copying -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
apourchet commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1606917357 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java: ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RackUtils { +private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class); + +public static Map> getRacksForTopicPartition(final Cluster cluster, + final InternalTopicManager internalTopicManager, + final Set topicPartitions, + final boolean isChangelog) { +final Set topicsToDescribe = new HashSet<>(); +if (isChangelog) { + topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect( +Collectors.toSet())); +} else { +topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, topicPartitions)); +} + +final Set topicsWithUpToDateMetadata = topicPartitions.stream() +.filter(partition -> !topicsToDescribe.contains(partition.topic())) +.collect(Collectors.toSet()); +final Map> racksForTopicPartition = knownRacksForPartition( +cluster, topicsWithUpToDateMetadata); + +final Map> freshTopicPartitionInfo = +describeTopics(internalTopicManager, topicsToDescribe); +freshTopicPartitionInfo.forEach((topic, partitionInfos) -> { +partitionInfos.forEach(partitionInfo -> { +final int partition = partitionInfo.partition(); +final TopicPartition topicPartition = new TopicPartition(topic, partition); +final List replicas = partitionInfo.replicas(); +if (replicas == null || replicas.isEmpty()) { +LOG.error("No replicas found for topic partition {}: {}", topic, partition); +return; +} + +final Set racks = replicas.stream().filter(Node::hasRack).map(Node::rack).collect( +Collectors.toSet()); +racksForTopicPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()); +racksForTopicPartition.get(topicPartition).addAll(racks); +}); +}); + +return racksForTopicPartition; +} + +public static Set topicsWithStaleMetadata(final Cluster cluster, final Set topicPartitions) { +final Set topicsWithStaleMetadata = new HashSet<>(); +for (final TopicPartition topicPartition : topicPartitions) { +final PartitionInfo partitionInfo = cluster.partition(topicPartition); +if (partitionInfo == null) { +LOG.error("TopicPartition {} doesn't exist in cluster", topicPartition); +continue; Review Comment: This isn't how the current rack aware code works: ```for (final TopicPartition topicPartition : topicPartitions) { final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition); if (partitionInfo == null) { log.error("TopicPartition {} doesn't exist in cluster", topicPartition); return false; } final Node[] replica = partitionInfo.replicas(); if (replica == null || replica.length ==
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
apourchet commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1606917357 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java: ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RackUtils { +private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class); + +public static Map> getRacksForTopicPartition(final Cluster cluster, + final InternalTopicManager internalTopicManager, + final Set topicPartitions, + final boolean isChangelog) { +final Set topicsToDescribe = new HashSet<>(); +if (isChangelog) { + topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect( +Collectors.toSet())); +} else { +topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, topicPartitions)); +} + +final Set topicsWithUpToDateMetadata = topicPartitions.stream() +.filter(partition -> !topicsToDescribe.contains(partition.topic())) +.collect(Collectors.toSet()); +final Map> racksForTopicPartition = knownRacksForPartition( +cluster, topicsWithUpToDateMetadata); + +final Map> freshTopicPartitionInfo = +describeTopics(internalTopicManager, topicsToDescribe); +freshTopicPartitionInfo.forEach((topic, partitionInfos) -> { +partitionInfos.forEach(partitionInfo -> { +final int partition = partitionInfo.partition(); +final TopicPartition topicPartition = new TopicPartition(topic, partition); +final List replicas = partitionInfo.replicas(); +if (replicas == null || replicas.isEmpty()) { +LOG.error("No replicas found for topic partition {}: {}", topic, partition); +return; +} + +final Set racks = replicas.stream().filter(Node::hasRack).map(Node::rack).collect( +Collectors.toSet()); +racksForTopicPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()); +racksForTopicPartition.get(topicPartition).addAll(racks); +}); +}); + +return racksForTopicPartition; +} + +public static Set topicsWithStaleMetadata(final Cluster cluster, final Set topicPartitions) { +final Set topicsWithStaleMetadata = new HashSet<>(); +for (final TopicPartition topicPartition : topicPartitions) { +final PartitionInfo partitionInfo = cluster.partition(topicPartition); +if (partitionInfo == null) { +LOG.error("TopicPartition {} doesn't exist in cluster", topicPartition); +continue; Review Comment: This isn't how the current rack aware code works: ```for (final TopicPartition topicPartition : topicPartitions) { final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition); if (partitionInfo == null) { log.error("TopicPartition {} doesn't exist in cluster", topicPartition); return false; } final Node[] replica = partitionInfo.replicas(); if (replica == null || replica.length ==
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
apourchet commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1606913055 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java: ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RackUtils { +private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class); + +public static Map> getRacksForTopicPartition(final Cluster cluster, + final InternalTopicManager internalTopicManager, + final Set topicPartitions, + final boolean isChangelog) { +final Set topicsToDescribe = new HashSet<>(); +if (isChangelog) { + topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect( +Collectors.toSet())); +} else { +topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, topicPartitions)); +} + +final Set topicsWithUpToDateMetadata = topicPartitions.stream() +.filter(partition -> !topicsToDescribe.contains(partition.topic())) +.collect(Collectors.toSet()); +final Map> racksForTopicPartition = knownRacksForPartition( +cluster, topicsWithUpToDateMetadata); + +final Map> freshTopicPartitionInfo = +describeTopics(internalTopicManager, topicsToDescribe); Review Comment: I agree, I wrote it this way to mimic the exact pattern of use that the RackAwareAssigner uses. Once this is all wired and tested though we can make optimizations and changes like this one (and the lazy rack info one). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605660608 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: Sorry for the wall of text It might not seem like a huge deal but if it's an app with only source-changelog partitions, then doing this will save the assignor from having to make any DescribeTopics request since there are no non-source changelogs. And yes, apps with only source changelogs do exist, they're pretty common for certain kinds of table-based processing (and especially apps that make heavy use of IQ). And saving a remote fetch is actually a pretty big deal, doing them in the middle of an assignment makes the rebalance vulnerable to timing out, especially when brokers are under heavy load or the app is experiencing rebalancing issues to begin with -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659907 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: To be more precise, I'm imagining something like this: ``` final Set sourceTopicPartitions = new HashSet<>(); final Set changelogTopicPartitions = new HashSet<>(); final Set nonSourceChangelogTopicPartitions = new HashSet<>(); for (final var entry : sourceTopicPartitions.entrySet()) { final TaskId task = entry.getKey(); final Set taskSourcePartitions = entry.getValue(); final Set taskChangelogPartitions = changelogTopicPartitions.get(taskId); final Set taskNonSourceChangelogPartitions = new HashSet(taskChangelogPartitions); taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions); logicalTaskIds.add(taskId); sourceTopicPartitions.addAll(taskSourcePartitions); changelogTopicPartitions.addAll(taskChangelogPartitions); nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions); } ``` Then we pass the `nonSourceChangelogPartitions` into the `#getRacksForTopicPartition` instead of the `changelogPartitions` set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605659120 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: Note that we'll also want to deduplicate the source-changelog partitions for the rack id computation. We should include them in the source topics/remove them from the changelog topics passed into the `#getRacksForTopicPartitions` call. Of course we still need the changelogTopicPartitions as well, so we'll want a third set of `nonSourceChangelogTopicPartitions` that's specifically for the rack id computation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605656530 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +changelogTopicPartitions.addAll(partitions); +}); + +final Map> racksForSourcePartitions = RackUtils.getRacksForTopicPartition( +cluster, internalTopicManager, sourceTopicPartitions, false); +final Map> racksForChangelogPartitions = RackUtils.getRacksForTopicPartition( Review Comment: Since the rack info is nontrivial to compute and always makes a remote call (which can take a long time and even time out or otherwise fail) and not every assignor/app will actually use it I'm thinking maybe we should try to initialize it lazily, only if/when the user actually requests the rack info I'm totally happy to push that into a followup PR to keep the scope well-defined for now, so don't worry about it for now. We'd still need everything you implemented here and would just be moving it around and/or subbing in function pointers instead of passing around data strucutres directly, so it shouldn't have any impact on how this PR goes. Just wanted to make a note so I don't forget -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1605639072 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ -private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { -final Set statelessTasks = new HashSet<>(); -for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { -final ClientState clientState = clientEntry.getValue().state; -statelessTasks.addAll(clientState.statelessActiveTasks()); +private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { +final Map> sourceTopicsByGroup = new HashMap<>(); +final Map> changelogTopicsByGroup = new HashMap<>(); +for (final Map.Entry entry : topicGroups.entrySet()) { +final Set sourceTopics = entry.getValue().sourceTopics; +final Set changelogTopics = entry.getValue().stateChangelogTopics() +.stream().map(t -> t.name).collect(Collectors.toSet()); +sourceTopicsByGroup.put(entry.getKey(), sourceTopics); +changelogTopicsByGroup.put(entry.getKey(), changelogTopics); } +final Map> sourcePartitionsForTask = +partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); +final Map> changelogPartitionsForTask = +partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + +final Set logicalTaskIds = new HashSet<>(); +final Set sourceTopicPartitions = new HashSet<>(); +sourcePartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); +sourceTopicPartitions.addAll(partitions); +}); +final Set changelogTopicPartitions = new HashSet<>(); +changelogPartitionsForTask.forEach((taskId, partitions) -> { +logicalTaskIds.add(taskId); Review Comment: I suppose this doesn't hurt anything since `logicalTasks` is a Set, but the taskIds returned by the partition grouper should be the same for the source and changelog topics. So you can remove this line (alternatively you can create the `logicalTaskIds` map up front by copying the keyset of one of the partitionsForTask maps but that's just an implementation detail, up to you. However I would probably consider adding a check to make sure these two maps return the same set of tasks. Doesn't need to scan the entire thing, maybe just a simple ``` if (sourcePartitionsForTask.size() != changelogPartitionsForTask.size()) { log.error("Partition grouper returned {} tasks for source topics but {} tasks for changelog topics, sourcePartitionsForTask.size(), changelogPartitionsForTask.size()); throw new TaskAssignmentException(//error msg ); } ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + + +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +/** + * A simple container class corresponding to a given {@link
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1604122125 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.TaskInfo; + +public class DefaultTaskInfo implements TaskInfo { + +private final TaskId id; +private final boolean isStateful; +private final Map> partitionToRackIds; +private final Set stateStoreNames; +private final Set inputTopicPartitions; +private final Set changelogTopicPartitions; + +public DefaultTaskInfo(final TaskId id, + final boolean isStateful, + final Map> partitionToRackIds, + final Set stateStoreNames, + final Set inputTopicPartitions, + final Set changelogTopicPartitions) { +this.id = id; +this.partitionToRackIds = unmodifiableMap(partitionToRackIds); +this.isStateful = isStateful; +this.stateStoreNames = unmodifiableSet(stateStoreNames); +this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions); +this.changelogTopicPartitions = unmodifiableSet(changelogTopicPartitions); +} + +public static DefaultTaskInfo of(final TaskId taskId, Review Comment: since this is an internal API, you can just have a normal public constructor. The static constructor thing is only for public classes where we want to make a "nice looking" fluent API ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.TaskInfo; + +public class DefaultTaskInfo implements TaskInfo { + +private final TaskId id; +private final boolean isStateful; +private final Map> partitionToRackIds; +private final Set stateStoreNames; +private final Set inputTopicPartitions; +private final Set changelogTopicPartitions; + +public DefaultTaskInfo(final TaskId id, + final boolean isStateful, + final Map> partitionToRackIds, + final Set stateStoreNames, + final Set inputTopicPartitions, + final Set
[PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
apourchet opened a new pull request, #15972: URL: https://github.com/apache/kafka/pull/15972 This rack information is required to compute rack-aware assignments, which many of the current assigners do. The internal ClientMetadata class was also edited to pass around this rack information. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org