mjsax commented on code in PR #14108:
URL: https://github.com/apache/kafka/pull/14108#discussion_r1281300901


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java:
##########
@@ -156,6 +163,38 @@ public boolean isAllowedTaskMovement(final ClientState 
source, final ClientState
         return true;
     }
 
+    @Override
+    public boolean isAllowedTaskMovement(final ClientState source,
+                                         final ClientState destination,
+                                         final TaskId sourceTask,
+                                         final Map<UUID, ClientState> 
clientStateMap) {
+
+        final BiConsumer<ClientState, Set<KeyValue<String, String>>> addTags = 
(cs, tagSet) -> {
+            final Map<String, String> tags = 
clientTagFunction.apply(cs.processId(), cs);
+            if (tags != null) {
+                tagSet.addAll(tags.entrySet().stream()
+                    .map(entry -> KeyValue.pair(entry.getKey(), 
entry.getValue()))
+                    .collect(Collectors.toList())
+                );
+            }
+        };
+
+        final Set<KeyValue<String, String>> tagsWithSource = new HashSet<>();
+        final Set<KeyValue<String, String>> tagsWithDestination = new 
HashSet<>();
+        for (final ClientState clientState : clientStateMap.values()) {
+            if (clientState.hasAssignedTask(sourceTask)
+                && !clientState.processId().equals(source.processId())

Review Comment:
   Not sure if I can follow? Does `clientState.hasAssignedTask(sourceTask)` not 
imply that the `processId` of `clientState` is the same as the "source client"?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java:
##########
@@ -156,6 +163,38 @@ public boolean isAllowedTaskMovement(final ClientState 
source, final ClientState
         return true;
     }
 
+    @Override
+    public boolean isAllowedTaskMovement(final ClientState source,

Review Comment:
   Not sure if I grog the semantics of this method.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java:
##########
@@ -16,8 +16,27 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import java.util.Map;
+import java.util.UUID;
+import org.apache.kafka.streams.processor.TaskId;
+
 interface StandbyTaskAssignor extends TaskAssignor {
     default boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
         return true;
     }
+
+    /**
+     * If a specific task can be moved from source to destination
+     * @param source Source client
+     * @param destination Destination client
+     * @param sourceTask Task to move
+     * @param clientStateMap All client metadata
+     * @return True if task can be moved, false otherwise
+     */
+    default boolean isAllowedTaskMovement(final ClientState source,
+                                          final ClientState destination,
+                                          final TaskId sourceTask,
+                                          final Map<UUID, ClientState> 
clientStateMap) {
+        return true;

Review Comment:
   Why is the default `true`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to