lucasbru commented on code in PR #18652:
URL: https://github.com/apache/kafka/pull/18652#discussion_r1935569358


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * Represents the state of a process in the group coordinator.
+ * This includes the capacity of the process, the load on the process, and the 
tasks assigned to the process.
+ */
+

Review Comment:
   remove extra newline



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    private static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+        initialize(groupSpec, topologyDescriber);
+        GroupAssignment assignments =  doAssign(groupSpec, topologyDescriber);
+        terminate();
+        return assignments;
+    }
+
+    private GroupAssignment doAssign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) {
+        //active
+        Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
+        assignActive(activeTasks);
+
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = taskIds(topologyDescriber, false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private void terminate() {
+        taskPairs = null;
+        activeTaskToPrevMember = null;
+        standbyTaskToPrevMember = null;
+        processIdToState = null;

Review Comment:
   You are not resetting the integer fields here. This is a bit prone to 
forgetting to reset a specific field. How about this:
   
   You introduce a private static class `LocalState` which contains all the 
fields. `LocalState localState` becomes the only field of this class. You can, 
in one shot, just set `localState = null` at the of assign.
   
   That would even enable just passing around the object `localState` instead 
of having a field in the first place. However, not sure if that would be too 
annoying in the code.
   



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    private static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+        initialize(groupSpec, topologyDescriber);
+        GroupAssignment assignments =  doAssign(groupSpec, topologyDescriber);
+        terminate();
+        return assignments;
+    }
+
+    private GroupAssignment doAssign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) {
+        //active
+        Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
+        assignActive(activeTasks);
+
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = taskIds(topologyDescriber, false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private void terminate() {
+        taskPairs = null;
+        activeTaskToPrevMember = null;
+        standbyTaskToPrevMember = null;
+        processIdToState = null;
+    }
+
+    private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber, 
final boolean isActive) {
+        Set<TaskId> ret = new HashSet<>();
+        for (String subtopology : topologyDescriber.subtopologies()) {
+            if (isActive || topologyDescriber.isStateful(subtopology)) {
+                int numberOfPartitions = 
topologyDescriber.numTasks(subtopology);
+                for (int i = 0; i < numberOfPartitions; i++) {
+                    ret.add(new TaskId(subtopology, i));
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
+

Review Comment:
   remove extra new line



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+

Review Comment:
   remove extra newline



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.kafka.common.utils.Utils.union;
+
+public class ProcessState {
+    private final String processId;
+    // number of members
+    private int capacity;

Review Comment:
   Did you want to address this @aliehsaeedii ?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    private static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+        initialize(groupSpec, topologyDescriber);
+        GroupAssignment assignments =  doAssign(groupSpec, topologyDescriber);
+        terminate();
+        return assignments;
+    }
+
+    private GroupAssignment doAssign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) {
+        //active
+        Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
+        assignActive(activeTasks);
+
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = taskIds(topologyDescriber, false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private void terminate() {
+        taskPairs = null;
+        activeTaskToPrevMember = null;
+        standbyTaskToPrevMember = null;
+        processIdToState = null;
+    }
+
+    private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber, 
final boolean isActive) {
+        Set<TaskId> ret = new HashSet<>();
+        for (String subtopology : topologyDescriber.subtopologies()) {
+            if (isActive || topologyDescriber.isStateful(subtopology)) {
+                int numberOfPartitions = 
topologyDescriber.numTasks(subtopology);
+                for (int i = 0; i < numberOfPartitions; i++) {
+                    ret.add(new TaskId(subtopology, i));
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
+
+        allTasks = 0;
+        for (String subtopology : topologyDescriber.subtopologies()) {
+            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            allTasks += numberOfPartitions;
+        }
+        totalCapacity = groupSpec.members().size();
+        tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+
+        taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2);
+
+        processIdToState = new HashMap<>();
+        activeTaskToPrevMember = new HashMap<>();
+        standbyTaskToPrevMember = new HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final String processId = memberEntry.getValue().processId();
+            final Member member = new Member(processId, memberId);
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+
+            processIdToState.putIfAbsent(processId, new 
ProcessState(processId));
+            processIdToState.get(processId).addMember(memberId);
+
+            // prev active tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    activeTaskToPrevMember.put(new TaskId(entry.getKey(), 
partitionNo), member);
+                }
+            }
+
+            // prev standby tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.standbyTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    TaskId taskId = new TaskId(entry.getKey(), partitionNo);
+                    standbyTaskToPrevMember.putIfAbsent(taskId, new 
HashSet<>());
+                    standbyTaskToPrevMember.get(taskId).add(member);
+                }
+            }
+        }
+    }
+
+    private GroupAssignment buildGroupAssignment(final Set<String> members) {
+        final Map<String, MemberAssignment> memberAssignments = new 
HashMap<>();
+
+        final Map<String, Set<TaskId>> activeTasksAssignments = 
processIdToState.entrySet().stream()
+            .flatMap(entry -> 
entry.getValue().assignedActiveTasksByMember().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(set1, set2) -> {
+                set1.addAll(set2);
+                return set1;
+            }));
+
+        final Map<String, Set<TaskId>> standbyTasksAssignments = 
processIdToState.entrySet().stream()
+            .flatMap(entry -> 
entry.getValue().assignedStandbyTasksByMember().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(set1, set2) -> {
+                set1.addAll(set2);
+                return set1;
+            }));
+
+        for (String memberId : members) {
+            Map<String, Set<Integer>> activeTasks = new HashMap<>();
+            if (activeTasksAssignments.containsKey(memberId)) {
+                activeTasks = 
toCompactedTaskIds(activeTasksAssignments.get(memberId));
+            }
+            Map<String, Set<Integer>> standByTasks = new HashMap<>();
+
+            if (standbyTasksAssignments.containsKey(memberId)) {
+                standByTasks = 
toCompactedTaskIds(standbyTasksAssignments.get(memberId));
+            }
+            memberAssignments.put(memberId, new MemberAssignment(activeTasks, 
standByTasks, new HashMap<>()));
+        }
+
+        return new GroupAssignment(memberAssignments);
+    }
+
+    private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> 
taskIds) {
+        Map<String, Set<Integer>> ret = new HashMap<>();
+        for (TaskId taskId : taskIds) {
+            ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>());
+            ret.get(taskId.subtopologyId()).add(taskId.partition());
+        }
+        return ret;
+    }
+
+    private void assignActive(final Set<TaskId> activeTasks) {
+

Review Comment:
   remove extra new line



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java:
##########
@@ -0,0 +1,1054 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.internal.util.collections.Sets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StickyTaskAssignorTest {
+
+    public static final String NUM_STANDBY_REPLICAS_CONFIG = 
"num.standby.replicas";
+    private final StickyTaskAssignor assignor = new StickyTaskAssignor();
+
+    @Test
+    public void 
shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() {

Review Comment:
   nit: If possible, try to split the unit tests into three sections separated 
by new-lines (without inline comments):
   
   ```
   setup...
   
   functionCall...
   
   asserts...
   ```
   
   You already have this in the other test class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to