This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1257cb3a95b451a2f19505798fc9dbdfda65feda Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Thu Jun 13 21:09:19 2024 +0200 Define the assignor interface for streams and implement a simple assignor - interfaces in assignor package ported to a streams-specific package on dev branch - implementation of a simple working assingor with unit tests See https://github.com/lucasbru/kafka/pull/20 --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../group/GroupCoordinatorAdapterTest.scala | 2 +- .../coordinator/group/GroupMetadataManager.java | 1 + .../coordinator/group/streams/Assignment.java | 15 +- .../group/taskassignor/AssignmentMemberSpec.java | 196 +++++++++++++++++++++ .../group/taskassignor/GroupAssignment.java | 67 +++++++ .../coordinator/group/taskassignor/GroupSpec.java | 37 ++++ .../group/taskassignor/GroupSpecImpl.java | 86 +++++++++ .../group/taskassignor/MemberAssignment.java | 86 +++++++++ .../group/taskassignor/MockAssignor.java | 115 ++++++++++++ .../group/taskassignor/TaskAssignor.java | 45 +++++ .../group/taskassignor/TaskAssignorException.java | 33 ++++ .../coordinator/group/taskassignor/TaskId.java | 66 +++++++ .../group/taskassignor/TopologyDescriber.java | 37 ++++ .../group/taskassignor/GroupSpecImplTest.java | 72 ++++++++ .../group/taskassignor/MockAssignorTest.java | 195 ++++++++++++++++++++ 16 files changed, 1048 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index e03844a63f7..0c9518c67c5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3894,7 +3894,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { + def handleStreamsHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val streamsHeartbeatRequest = request.body[StreamsHeartbeatRequest] if (!config.isNewGroupCoordinatorEnabled) { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 69845c44169..d2718f2c5ad 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -100,7 +100,7 @@ class GroupCoordinatorAdapterTest { val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.STREAMS_HEARTBEAT, ApiKeys.STREAMS_HEARTBEAT.latestVersion) - val request = new StreamsHeartbeatRequestData()() + val request = new StreamsHeartbeatRequestData() .setGroupId("group") val future = adapter.streamsHeartbeat(ctx, request) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index ee18f3d46cd..b2a013348fb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -192,6 +192,7 @@ import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe * 2) The replay methods which apply records to the hard state. Those are used in the request * handling as well as during the initial loading of the records from the partitions. */ +@SuppressWarnings("JavaNCSS") public class GroupMetadataManager { public static class Builder { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java index 54e4cbc63a4..b80e34cbfe5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; * An immutable assignment for a member. */ public class Assignment { + public static final Assignment EMPTY = new Assignment( Collections.emptyMap(), Collections.emptyMap(), @@ -70,8 +71,12 @@ public class Assignment { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } Assignment that = (Assignment) o; return Objects.equals(activeTasks, that.activeTasks) && Objects.equals(standbyTasks, that.standbyTasks) @@ -87,7 +92,7 @@ public class Assignment { public String toString() { return "Assignment(active tasks=" + activeTasks + ", standby tasks=" + standbyTasks + - ", warm-up tasks=" + warmupTasks +')'; + ", warm-up tasks=" + warmupTasks + ')'; } /** @@ -103,8 +108,8 @@ public class Assignment { return new Assignment( record.activeTasks().stream() .collect(Collectors.toMap( - StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, - taskId -> new HashSet<>(taskId.partitions()) + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, + taskId -> new HashSet<>(taskId.partitions()) ) ), record.standbyTasks().stream() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/AssignmentMemberSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/AssignmentMemberSpec.java new file mode 100644 index 00000000000..64ab7ece4e9 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/AssignmentMemberSpec.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * The assignment specification for a Streams group member. + */ +public class AssignmentMemberSpec { + + /** + * The instance ID if provided. + */ + private final Optional<String> instanceId; + + /** + * The rack ID if provided. + */ + private final Optional<String> rackId; + + /** + * Reconciled active tasks + */ + private final Map<String, Set<Integer>> activeTasks; + + /** + * Reconciled standby tasks + */ + private final Map<String, Set<Integer>> standbyTasks; + + /** + * Reconciled warm-up tasks + */ + private final Map<String, Set<Integer>> warmupTasks; + + /** + * The process ID. + */ + private final String processId; + + /** + * The client tags for a rack-aware assignment. + */ + private final Map<String, String> clientTags; + + /** + * The assignment configs for customizing the assignment. + */ + private final Map<String, String> assignmentConfigs; + + /** + * The last received cumulative task offsets of assigned tasks or dormant tasks. + */ + private final Map<TaskId, Long> taskOffsets; + + /** + * @return The instance ID as an Optional. + */ + public Optional<String> instanceId() { + return instanceId; + } + + /** + * @return The rack ID as an Optional. + */ + public Optional<String> rackId() { + return rackId; + } + + /** + * @return The assigned active tasks. + */ + public Map<String, Set<Integer>> activeTasks() { + return activeTasks; + } + + /** + * @return The assigned standby tasks. + */ + public Map<String, Set<Integer>> standbyTasks() { + return standbyTasks; + } + + /** + * @return The assigned warm-up tasks. + */ + public Map<String, Set<Integer>> warmupTasks() { + return warmupTasks; + } + + public Map<TaskId, Long> taskOffsets() { + return taskOffsets; + } + + public Map<String, String> clientTags() { + return clientTags; + } + + public Map<String, String> assignmentConfigs() { + return assignmentConfigs; + } + + public String processId() { + return processId; + } + + public AssignmentMemberSpec(final Optional<String> instanceId, + final Optional<String> rackId, + final Map<String, Set<Integer>> activeTasks, + final Map<String, Set<Integer>> standbyTasks, + final Map<String, Set<Integer>> warmupTasks, + final String processId, + final Map<String, String> clientTags, + final Map<String, String> assignmentConfigs, + final Map<TaskId, Long> taskOffsets) { + this.instanceId = Objects.requireNonNull(instanceId); + this.rackId = Objects.requireNonNull(rackId); + this.activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)); + this.standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)); + this.warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)); + this.processId = Objects.requireNonNull(processId); + this.clientTags = Collections.unmodifiableMap(Objects.requireNonNull(clientTags)); + this.assignmentConfigs = Collections.unmodifiableMap(Objects.requireNonNull(assignmentConfigs)); + this.taskOffsets = Collections.unmodifiableMap(Objects.requireNonNull(taskOffsets)); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AssignmentMemberSpec that = (AssignmentMemberSpec) o; + return Objects.equals(instanceId, that.instanceId) + && Objects.equals(rackId, that.rackId) + && Objects.equals(activeTasks, that.activeTasks) + && Objects.equals(standbyTasks, that.standbyTasks) + && Objects.equals(warmupTasks, that.warmupTasks) + && Objects.equals(processId, that.processId) + && Objects.equals(clientTags, that.clientTags) + && Objects.equals(assignmentConfigs, that.assignmentConfigs) + && Objects.equals(taskOffsets, that.taskOffsets); + } + + @Override + public int hashCode() { + return Objects.hash( + instanceId, + rackId, + activeTasks, + standbyTasks, + warmupTasks, + processId, + clientTags, + assignmentConfigs, + taskOffsets + ); + } + + @Override + public String toString() { + return "AssignmentMemberSpec{" + + "instanceId=" + instanceId + + ", rackId=" + rackId + + ", activeTasks=" + activeTasks + + ", standbyTasks=" + standbyTasks + + ", warmupTasks=" + warmupTasks + + ", processId='" + processId + '\'' + + ", clientTags=" + clientTags + + ", assignmentConfigs=" + assignmentConfigs + + ", taskOffsets=" + taskOffsets + + '}'; + } + +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupAssignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupAssignment.java new file mode 100644 index 00000000000..313e1d72bf1 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupAssignment.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.Map; +import java.util.Objects; + +/** + * The task assignment for a streams group. + */ +public class GroupAssignment { + + /** + * The member assignments keyed by member id. + */ + private final Map<String, MemberAssignment> members; + + public GroupAssignment( + Map<String, MemberAssignment> members + ) { + Objects.requireNonNull(members); + this.members = members; + } + + /** + * @return Member assignments keyed by member Ids. + */ + public Map<String, MemberAssignment> members() { + return members; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GroupAssignment that = (GroupAssignment) o; + return members.equals(that.members); + } + + @Override + public int hashCode() { + return members.hashCode(); + } + + @Override + public String toString() { + return "GroupAssignment(members=" + members + ')'; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java new file mode 100644 index 00000000000..cb9c5f3e398 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.List; +import java.util.Map; + +/** + * The group metadata specifications required to compute the target assignment. + */ +public interface GroupSpec { + + /** + * @return Member metadata keyed by member Id. + */ + Map<String, AssignmentMemberSpec> members(); + + /** + * @return The list of subtopologies. + */ + List<String> subtopologies(); + +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java new file mode 100644 index 00000000000..e08acaa468e --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * The assignment specification for a Streams group. + */ +public class GroupSpecImpl implements GroupSpec { + + /** + * The member metadata keyed by member Id. + */ + private final Map<String, AssignmentMemberSpec> members; + + /** + * The subtopologies. + */ + private final List<String> subtopologies; + + public GroupSpecImpl( + Map<String, AssignmentMemberSpec> members, + List<String> subtopologies + ) { + Objects.requireNonNull(members); + Objects.requireNonNull(subtopologies); + this.members = members; + this.subtopologies = subtopologies; + } + + /** + * {@inheritDoc} + */ + @Override + public Map<String, AssignmentMemberSpec> members() { + return members; + } + + @Override + public List<String> subtopologies() { + return subtopologies; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final GroupSpecImpl groupSpec = (GroupSpecImpl) o; + return Objects.equals(members, groupSpec.members) + && Objects.equals(subtopologies, groupSpec.subtopologies); + } + + @Override + public int hashCode() { + return Objects.hash(members, subtopologies); + } + + @Override + public String toString() { + return "GroupSpecImpl{" + + "members=" + members + + ", partitionsPerSubtopology=" + subtopologies + + '}'; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MemberAssignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MemberAssignment.java new file mode 100644 index 00000000000..42e5446b778 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MemberAssignment.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * The task assignment for a Streams group member. + */ +public class MemberAssignment { + + /** + * The target tasks assigned to this member keyed by subtopologyId. + */ + private final Map<String, Set<Integer>> activeTasks; + private final Map<String, Set<Integer>> standbyTasks; + private final Map<String, Set<Integer>> warmupTasks; + + public MemberAssignment(final Map<String, Set<Integer>> activeTasks, + final Map<String, Set<Integer>> standbyTasks, + final Map<String, Set<Integer>> warmupTasks) { + this.activeTasks = activeTasks; + this.standbyTasks = standbyTasks; + this.warmupTasks = warmupTasks; + } + + public Map<String, Set<Integer>> activeTasks() { + return activeTasks; + } + + public Map<String, Set<Integer>> standbyTasks() { + return standbyTasks; + } + + public Map<String, Set<Integer>> warmupTasks() { + return warmupTasks; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final MemberAssignment that = (MemberAssignment) o; + return Objects.equals(activeTasks, that.activeTasks) + && Objects.equals(standbyTasks, that.standbyTasks) + && Objects.equals(warmupTasks, that.warmupTasks); + } + + @Override + public int hashCode() { + return Objects.hash( + activeTasks, + standbyTasks, + warmupTasks + ); + } + + @Override + public String toString() { + return "MemberAssignment{" + + "activeTasks=" + activeTasks + + ", standbyTasks=" + standbyTasks + + ", warmupTasks=" + warmupTasks + + '}'; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java new file mode 100644 index 00000000000..1a7502b9b52 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Mock implementation of {@link TaskAssignor} that assigns tasks to members in a round-robin fashion, with a bit of stickiness. + */ +public class MockAssignor implements TaskAssignor { + + public static final String MOCK_ASSIGNOR_NAME = "mock"; + + @Override + public String name() { + return MOCK_ASSIGNOR_NAME; + } + + @Override + public GroupAssignment assign( + final GroupSpec groupSpec, + final TopologyDescriber topologyDescriber + ) throws TaskAssignorException { + + Map<String, MemberAssignment> newTargetAssignment = new HashMap<>(); + Map<String, String[]> subtopologyToActiveMember = new HashMap<>(); + + for (String subtopology : groupSpec.subtopologies()) { + int numberOfPartitions = topologyDescriber.numPartitions(subtopology); + subtopologyToActiveMember.put(subtopology, new String[numberOfPartitions]); + } + + // Copy existing assignment and fill temporary data structures + for (Map.Entry<String, AssignmentMemberSpec> memberEntry : groupSpec.members().entrySet()) { + final String memberId = memberEntry.getKey(); + final AssignmentMemberSpec memberSpec = memberEntry.getValue(); + + Map<String, Set<Integer>> activeTasks = new HashMap<>(memberSpec.activeTasks()); + + newTargetAssignment.put(memberId, new MemberAssignment(activeTasks, new HashMap<>(), new HashMap<>())); + for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) { + final String subtopologyId = entry.getKey(); + final Set<Integer> taskIds = entry.getValue(); + final String[] activeMembers = subtopologyToActiveMember.get(subtopologyId); + for (int taskId : taskIds) { + if (activeMembers[taskId] != null) { + throw new TaskAssignorException( + "Task " + taskId + " of subtopology " + subtopologyId + " is assigned to multiple members."); + } + activeMembers[taskId] = memberId; + } + } + } + + // Define priority queue to sort members by task count + PriorityQueue<MemberAndTaskCount> memberAndTaskCount = new PriorityQueue<>(Comparator.comparingInt(m -> m.taskCount)); + memberAndTaskCount.addAll( + newTargetAssignment.keySet().stream() + .map(memberId -> new MemberAndTaskCount(memberId, + newTargetAssignment.get(memberId).activeTasks().values().stream().mapToInt(Set::size).sum())) + .collect(Collectors.toSet()) + ); + + // Assign unassigned tasks to members with fewest tasks + for (Map.Entry<String, String[]> entry : subtopologyToActiveMember.entrySet()) { + final String subtopologyId = entry.getKey(); + final String[] activeMembers = entry.getValue(); + for (int i = 0; i < activeMembers.length; i++) { + if (activeMembers[i] == null) { + final MemberAndTaskCount m = memberAndTaskCount.poll(); + if (m == null) { + throw new TaskAssignorException("No member available to assign task " + i + " of subtopology " + subtopologyId); + } + newTargetAssignment.get(m.memberId).activeTasks().computeIfAbsent(subtopologyId, k -> new HashSet<>()).add(i); + activeMembers[i] = m.memberId; + memberAndTaskCount.add(new MemberAndTaskCount(m.memberId, m.taskCount + 1)); + } + } + } + + return new GroupAssignment(newTargetAssignment); + } + + private static class MemberAndTaskCount { + + private final String memberId; + private final int taskCount; + + MemberAndTaskCount(String memberId, int taskCount) { + this.memberId = memberId; + this.taskCount = taskCount; + } + } +} + diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignor.java new file mode 100644 index 00000000000..c2c88b14236 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server side task assignor used by the GroupCoordinator. + * <p> + * The interface is kept in an internal module until KIP-848 is fully implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface TaskAssignor { + + /** + * Unique name for this assignor. + */ + String name(); + + /** + * Assigns tasks to group members based on the given assignment specification and topic metadata. + * + * @param groupSpec The assignment spec which includes member metadata. + * @param topologyDescriber The task metadata describer. + * @return The new assignment for the group. + */ + GroupAssignment assign( + GroupSpec groupSpec, + TopologyDescriber topologyDescriber + ) throws TaskAssignorException; +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignorException.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignorException.java new file mode 100644 index 00000000000..a3f7edd8a94 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignorException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import org.apache.kafka.common.errors.ApiException; + +/** + * Exception thrown by {@link TaskAssignor#assign(GroupSpec, TopologyDescriber)}}. The exception is only used internally. + */ +public class TaskAssignorException extends ApiException { + + public TaskAssignorException(String message) { + super(message); + } + + public TaskAssignorException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java new file mode 100644 index 00000000000..3b828a60413 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.Objects; + +public final class TaskId { + + private final String subtopologyId; + private final int partition; + + public TaskId(final String subtopologyId, final int partition) { + this.subtopologyId = subtopologyId; + this.partition = partition; + } + + public String subtopologyId() { + return subtopologyId; + } + + public int partition() { + return partition; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TaskId taskId = (TaskId) o; + return partition == taskId.partition && Objects.equals(subtopologyId, taskId.subtopologyId); + } + + @Override + public int hashCode() { + return Objects.hash( + subtopologyId, + partition + ); + } + + @Override + public String toString() { + return "TaskId{" + + "subtopologyId='" + subtopologyId + '\'' + + ", partition=" + partition + + '}'; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java new file mode 100644 index 00000000000..f2a28e2cc10 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * The subscribed topic describer is used by the {@link TaskAssignor} to obtain topic and task metadata of the subscribed topics. + * <p> + * The interface is kept in an internal module until KIP-848 is fully implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface TopologyDescriber { + + /** + * The number of partitions for the given subtopology. + * + * @param subtopologyId String corresponding to the subtopology. + * @return The number of tasks corresponding to the given subtopology ID, or -1 if the subtopology ID does not exist. + */ + int numPartitions(String subtopologyId); + +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java new file mode 100644 index 00000000000..b9c9a8e7aaa --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +public class GroupSpecImplTest { + + private Map<String, AssignmentMemberSpec> members; + private List<String> subtopologies; + private GroupSpecImpl groupSpec; + + @BeforeEach + void setUp() { + members = new HashMap<>(); + subtopologies = new ArrayList<>(); + + members.put("test-member", new AssignmentMemberSpec( + Optional.of("test-instance"), + Optional.of("test-rack"), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + "test-process", + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + )); + + subtopologies.add("test-subtopology"); + + groupSpec = new GroupSpecImpl( + members, + subtopologies + ); + } + + @Test + void testMembers() { + assertEquals(members, groupSpec.members()); + } + + @Test + void testSubtopologies() { + assertEquals(subtopologies, groupSpec.subtopologies()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java new file mode 100644 index 00000000000..bc316db8772 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class MockAssignorTest { + + private final MockAssignor assignor = new MockAssignor(); + + + @Test + public void testBasicScenario() { + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl( + Collections.emptyMap(), + Collections.emptyList() + ), + x -> 5 + ); + + assertEquals(0, result.members().size()); + } + + + @Test + public void testSingleMember() { + + final AssignmentMemberSpec memberSpec = new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + "test-process", + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl( + Collections.singletonMap("test_member", memberSpec), + Collections.singletonList("test-subtopology") + ), + x -> 4 + ); + + assertEquals(1, result.members().size()); + final MemberAssignment testMember = result.members().get("test_member"); + assertNotNull(testMember); + assertEquals(mkMap( + mkEntry("test-subtopology", mkSet(0, 1, 2, 3)) + ), testMember.activeTasks()); + } + + + @Test + public void testTwoMembersTwoSubtopologies() { + + final AssignmentMemberSpec memberSpec1 = new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + "test-process", + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + final AssignmentMemberSpec memberSpec2 = new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + "test-process", + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl( + mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)), + Arrays.asList("test-subtopology1", "test-subtopology2") + ), + x -> 4 + ); + + final Map<String, Set<Integer>> expected1 = mkMap( + mkEntry("test-subtopology1", mkSet(1, 3)), + mkEntry("test-subtopology2", mkSet(1, 3)) + ); + final Map<String, Set<Integer>> expected2 = mkMap( + mkEntry("test-subtopology1", mkSet(0, 2)), + mkEntry("test-subtopology2", mkSet(0, 2)) + ); + + assertEquals(2, result.members().size()); + final MemberAssignment testMember1 = result.members().get("test_member1"); + final MemberAssignment testMember2 = result.members().get("test_member2"); + assertNotNull(testMember1); + assertNotNull(testMember2); + assertTrue(expected1.equals(testMember1.activeTasks()) || expected2.equals(testMember1.activeTasks())); + assertTrue(expected1.equals(testMember2.activeTasks()) || expected2.equals(testMember2.activeTasks())); + } + + @Test + public void testTwoMembersTwoSubtopologiesStickiness() { + + final AssignmentMemberSpec memberSpec1 = new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + mkMap( + mkEntry("test-subtopology1", mkSet(0, 2, 3)), + mkEntry("test-subtopology2", mkSet(0)) + ), + Collections.emptyMap(), + Collections.emptyMap(), + "test-process", + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + final AssignmentMemberSpec memberSpec2 = new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + mkMap( + mkEntry("test-subtopology1", mkSet(1)), + mkEntry("test-subtopology2", mkSet(3)) + ), + Collections.emptyMap(), + Collections.emptyMap(), + "test-process", + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl( + mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)), + Arrays.asList("test-subtopology1", "test-subtopology2") + ), + x -> 4 + ); + + assertEquals(2, result.members().size()); + final MemberAssignment testMember1 = result.members().get("test_member1"); + final MemberAssignment testMember2 = result.members().get("test_member2"); + assertNotNull(testMember1); + assertNotNull(testMember2); + assertEquals(mkMap( + mkEntry("test-subtopology1", mkSet(0, 2, 3)), + mkEntry("test-subtopology2", mkSet(0)) + ), testMember1.activeTasks()); + assertEquals(mkMap( + mkEntry("test-subtopology1", mkSet(1)), + mkEntry("test-subtopology2", mkSet(1, 2, 3)) + ), testMember2.activeTasks()); + } + +}