mjsax commented on code in PR #14108: URL: https://github.com/apache/kafka/pull/14108#discussion_r1281300901
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java: ########## @@ -156,6 +163,38 @@ public boolean isAllowedTaskMovement(final ClientState source, final ClientState return true; } + @Override + public boolean isAllowedTaskMovement(final ClientState source, + final ClientState destination, + final TaskId sourceTask, + final Map<UUID, ClientState> clientStateMap) { + + final BiConsumer<ClientState, Set<KeyValue<String, String>>> addTags = (cs, tagSet) -> { + final Map<String, String> tags = clientTagFunction.apply(cs.processId(), cs); + if (tags != null) { + tagSet.addAll(tags.entrySet().stream() + .map(entry -> KeyValue.pair(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()) + ); + } + }; + + final Set<KeyValue<String, String>> tagsWithSource = new HashSet<>(); + final Set<KeyValue<String, String>> tagsWithDestination = new HashSet<>(); + for (final ClientState clientState : clientStateMap.values()) { + if (clientState.hasAssignedTask(sourceTask) + && !clientState.processId().equals(source.processId()) Review Comment: Not sure if I can follow? Does `clientState.hasAssignedTask(sourceTask)` not imply that the `processId` of `clientState` is the same as the "source client"? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java: ########## @@ -156,6 +163,38 @@ public boolean isAllowedTaskMovement(final ClientState source, final ClientState return true; } + @Override + public boolean isAllowedTaskMovement(final ClientState source, Review Comment: Not sure if I grog the semantics of this method. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java: ########## @@ -16,8 +16,27 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.Map; +import java.util.UUID; +import org.apache.kafka.streams.processor.TaskId; + interface StandbyTaskAssignor extends TaskAssignor { default boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { return true; } + + /** + * If a specific task can be moved from source to destination + * @param source Source client + * @param destination Destination client + * @param sourceTask Task to move + * @param clientStateMap All client metadata + * @return True if task can be moved, false otherwise + */ + default boolean isAllowedTaskMovement(final ClientState source, + final ClientState destination, + final TaskId sourceTask, + final Map<UUID, ClientState> clientStateMap) { + return true; Review Comment: Why is the default `true`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org