This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1257cb3a95b451a2f19505798fc9dbdfda65feda
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Thu Jun 13 21:09:19 2024 +0200

    Define the assignor interface for streams and implement a simple assignor
    
    - interfaces in assignor package ported to a streams-specific package on 
dev branch
    - implementation of a simple working assingor with unit tests
    
    See https://github.com/lucasbru/kafka/pull/20
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 .../group/GroupCoordinatorAdapterTest.scala        |   2 +-
 .../coordinator/group/GroupMetadataManager.java    |   1 +
 .../coordinator/group/streams/Assignment.java      |  15 +-
 .../group/taskassignor/AssignmentMemberSpec.java   | 196 +++++++++++++++++++++
 .../group/taskassignor/GroupAssignment.java        |  67 +++++++
 .../coordinator/group/taskassignor/GroupSpec.java  |  37 ++++
 .../group/taskassignor/GroupSpecImpl.java          |  86 +++++++++
 .../group/taskassignor/MemberAssignment.java       |  86 +++++++++
 .../group/taskassignor/MockAssignor.java           | 115 ++++++++++++
 .../group/taskassignor/TaskAssignor.java           |  45 +++++
 .../group/taskassignor/TaskAssignorException.java  |  33 ++++
 .../coordinator/group/taskassignor/TaskId.java     |  66 +++++++
 .../group/taskassignor/TopologyDescriber.java      |  37 ++++
 .../group/taskassignor/GroupSpecImplTest.java      |  72 ++++++++
 .../group/taskassignor/MockAssignorTest.java       | 195 ++++++++++++++++++++
 16 files changed, 1048 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e03844a63f7..0c9518c67c5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3894,7 +3894,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleStreamsGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+  def handleStreamsHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val streamsHeartbeatRequest = request.body[StreamsHeartbeatRequest]
 
     if (!config.isNewGroupCoordinatorEnabled) {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 69845c44169..d2718f2c5ad 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -100,7 +100,7 @@ class GroupCoordinatorAdapterTest {
     val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val ctx = makeContext(ApiKeys.STREAMS_HEARTBEAT, 
ApiKeys.STREAMS_HEARTBEAT.latestVersion)
-    val request = new StreamsHeartbeatRequestData()()
+    val request = new StreamsHeartbeatRequestData()
       .setGroupId("group")
 
     val future = adapter.streamsHeartbeat(ctx, request)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ee18f3d46cd..b2a013348fb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -192,6 +192,7 @@ import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe
  * 2) The replay methods which apply records to the hard state. Those are used 
in the request
  *    handling as well as during the initial loading of the records from the 
partitions.
  */
+@SuppressWarnings("JavaNCSS")
 public class GroupMetadataManager {
 
     public static class Builder {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
index 54e4cbc63a4..b80e34cbfe5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
  * An immutable assignment for a member.
  */
 public class Assignment {
+
     public static final Assignment EMPTY = new Assignment(
         Collections.emptyMap(),
         Collections.emptyMap(),
@@ -70,8 +71,12 @@ public class Assignment {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         Assignment that = (Assignment) o;
         return Objects.equals(activeTasks, that.activeTasks)
             && Objects.equals(standbyTasks, that.standbyTasks)
@@ -87,7 +92,7 @@ public class Assignment {
     public String toString() {
         return "Assignment(active tasks=" + activeTasks +
             ", standby tasks=" + standbyTasks +
-            ", warm-up tasks=" + warmupTasks +')';
+            ", warm-up tasks=" + warmupTasks + ')';
     }
 
     /**
@@ -103,8 +108,8 @@ public class Assignment {
         return new Assignment(
             record.activeTasks().stream()
                 .collect(Collectors.toMap(
-                    
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
-                    taskId -> new HashSet<>(taskId.partitions())
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
+                        taskId -> new HashSet<>(taskId.partitions())
                     )
                 ),
             record.standbyTasks().stream()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/AssignmentMemberSpec.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/AssignmentMemberSpec.java
new file mode 100644
index 00000000000..64ab7ece4e9
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/AssignmentMemberSpec.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The assignment specification for a Streams group member.
+ */
+public class AssignmentMemberSpec {
+
+    /**
+     * The instance ID if provided.
+     */
+    private final Optional<String> instanceId;
+
+    /**
+     * The rack ID if provided.
+     */
+    private final Optional<String> rackId;
+
+    /**
+     * Reconciled active tasks
+     */
+    private final Map<String, Set<Integer>> activeTasks;
+
+    /**
+     * Reconciled standby tasks
+     */
+    private final Map<String, Set<Integer>> standbyTasks;
+
+    /**
+     * Reconciled warm-up tasks
+     */
+    private final Map<String, Set<Integer>> warmupTasks;
+
+    /**
+     * The process ID.
+     */
+    private final String processId;
+
+    /**
+     * The client tags for a rack-aware assignment.
+     */
+    private final Map<String, String> clientTags;
+
+    /**
+     * The assignment configs for customizing the assignment.
+     */
+    private final Map<String, String> assignmentConfigs;
+
+    /**
+     * The last received cumulative task offsets of assigned tasks or dormant 
tasks.
+     */
+    private final Map<TaskId, Long> taskOffsets;
+
+    /**
+     * @return The instance ID as an Optional.
+     */
+    public Optional<String> instanceId() {
+        return instanceId;
+    }
+
+    /**
+     * @return The rack ID as an Optional.
+     */
+    public Optional<String> rackId() {
+        return rackId;
+    }
+
+    /**
+     * @return The assigned active tasks.
+     */
+    public Map<String, Set<Integer>> activeTasks() {
+        return activeTasks;
+    }
+
+    /**
+     * @return The assigned standby tasks.
+     */
+    public Map<String, Set<Integer>> standbyTasks() {
+        return standbyTasks;
+    }
+
+    /**
+     * @return The assigned warm-up tasks.
+     */
+    public Map<String, Set<Integer>> warmupTasks() {
+        return warmupTasks;
+    }
+
+    public Map<TaskId, Long> taskOffsets() {
+        return taskOffsets;
+    }
+
+    public Map<String, String> clientTags() {
+        return clientTags;
+    }
+
+    public Map<String, String> assignmentConfigs() {
+        return assignmentConfigs;
+    }
+
+    public String processId() {
+        return processId;
+    }
+
+    public AssignmentMemberSpec(final Optional<String> instanceId,
+                                final Optional<String> rackId,
+                                final Map<String, Set<Integer>> activeTasks,
+                                final Map<String, Set<Integer>> standbyTasks,
+                                final Map<String, Set<Integer>> warmupTasks,
+                                final String processId,
+                                final Map<String, String> clientTags,
+                                final Map<String, String> assignmentConfigs,
+                                final Map<TaskId, Long> taskOffsets) {
+        this.instanceId = Objects.requireNonNull(instanceId);
+        this.rackId = Objects.requireNonNull(rackId);
+        this.activeTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
+        this.standbyTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
+        this.warmupTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
+        this.processId = Objects.requireNonNull(processId);
+        this.clientTags = 
Collections.unmodifiableMap(Objects.requireNonNull(clientTags));
+        this.assignmentConfigs = 
Collections.unmodifiableMap(Objects.requireNonNull(assignmentConfigs));
+        this.taskOffsets = 
Collections.unmodifiableMap(Objects.requireNonNull(taskOffsets));
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final AssignmentMemberSpec that = (AssignmentMemberSpec) o;
+        return Objects.equals(instanceId, that.instanceId)
+            && Objects.equals(rackId, that.rackId)
+            && Objects.equals(activeTasks, that.activeTasks)
+            && Objects.equals(standbyTasks, that.standbyTasks)
+            && Objects.equals(warmupTasks, that.warmupTasks)
+            && Objects.equals(processId, that.processId)
+            && Objects.equals(clientTags, that.clientTags)
+            && Objects.equals(assignmentConfigs, that.assignmentConfigs)
+            && Objects.equals(taskOffsets, that.taskOffsets);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            instanceId,
+            rackId,
+            activeTasks,
+            standbyTasks,
+            warmupTasks,
+            processId,
+            clientTags,
+            assignmentConfigs,
+            taskOffsets
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "AssignmentMemberSpec{" +
+            "instanceId=" + instanceId +
+            ", rackId=" + rackId +
+            ", activeTasks=" + activeTasks +
+            ", standbyTasks=" + standbyTasks +
+            ", warmupTasks=" + warmupTasks +
+            ", processId='" + processId + '\'' +
+            ", clientTags=" + clientTags +
+            ", assignmentConfigs=" + assignmentConfigs +
+            ", taskOffsets=" + taskOffsets +
+            '}';
+    }
+
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupAssignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupAssignment.java
new file mode 100644
index 00000000000..313e1d72bf1
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupAssignment.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The task assignment for a streams group.
+ */
+public class GroupAssignment {
+
+    /**
+     * The member assignments keyed by member id.
+     */
+    private final Map<String, MemberAssignment> members;
+
+    public GroupAssignment(
+        Map<String, MemberAssignment> members
+    ) {
+        Objects.requireNonNull(members);
+        this.members = members;
+    }
+
+    /**
+     * @return Member assignments keyed by member Ids.
+     */
+    public Map<String, MemberAssignment> members() {
+        return members;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        GroupAssignment that = (GroupAssignment) o;
+        return members.equals(that.members);
+    }
+
+    @Override
+    public int hashCode() {
+        return members.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "GroupAssignment(members=" + members + ')';
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java
new file mode 100644
index 00000000000..cb9c5f3e398
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The group metadata specifications required to compute the target assignment.
+ */
+public interface GroupSpec {
+
+    /**
+     * @return Member metadata keyed by member Id.
+     */
+    Map<String, AssignmentMemberSpec> members();
+
+    /**
+     * @return The list of subtopologies.
+     */
+    List<String> subtopologies();
+
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java
new file mode 100644
index 00000000000..e08acaa468e
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The assignment specification for a Streams group.
+ */
+public class GroupSpecImpl implements GroupSpec {
+
+    /**
+     * The member metadata keyed by member Id.
+     */
+    private final Map<String, AssignmentMemberSpec> members;
+
+    /**
+     * The subtopologies.
+     */
+    private final List<String> subtopologies;
+
+    public GroupSpecImpl(
+        Map<String, AssignmentMemberSpec> members,
+        List<String> subtopologies
+    ) {
+        Objects.requireNonNull(members);
+        Objects.requireNonNull(subtopologies);
+        this.members = members;
+        this.subtopologies = subtopologies;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Map<String, AssignmentMemberSpec> members() {
+        return members;
+    }
+
+    @Override
+    public List<String> subtopologies() {
+        return subtopologies;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final GroupSpecImpl groupSpec = (GroupSpecImpl) o;
+        return Objects.equals(members, groupSpec.members)
+            && Objects.equals(subtopologies, groupSpec.subtopologies);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(members, subtopologies);
+    }
+
+    @Override
+    public String toString() {
+        return "GroupSpecImpl{" +
+            "members=" + members +
+            ", partitionsPerSubtopology=" + subtopologies +
+            '}';
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MemberAssignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MemberAssignment.java
new file mode 100644
index 00000000000..42e5446b778
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MemberAssignment.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * The task assignment for a Streams group member.
+ */
+public class MemberAssignment {
+
+    /**
+     * The target tasks assigned to this member keyed by subtopologyId.
+     */
+    private final Map<String, Set<Integer>> activeTasks;
+    private final Map<String, Set<Integer>> standbyTasks;
+    private final Map<String, Set<Integer>> warmupTasks;
+
+    public MemberAssignment(final Map<String, Set<Integer>> activeTasks,
+                            final Map<String, Set<Integer>> standbyTasks,
+                            final Map<String, Set<Integer>> warmupTasks) {
+        this.activeTasks = activeTasks;
+        this.standbyTasks = standbyTasks;
+        this.warmupTasks = warmupTasks;
+    }
+
+    public Map<String, Set<Integer>> activeTasks() {
+        return activeTasks;
+    }
+
+    public Map<String, Set<Integer>> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public Map<String, Set<Integer>> warmupTasks() {
+        return warmupTasks;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final MemberAssignment that = (MemberAssignment) o;
+        return Objects.equals(activeTasks, that.activeTasks)
+            && Objects.equals(standbyTasks, that.standbyTasks)
+            && Objects.equals(warmupTasks, that.warmupTasks);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            activeTasks,
+            standbyTasks,
+            warmupTasks
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "MemberAssignment{" +
+            "activeTasks=" + activeTasks +
+            ", standbyTasks=" + standbyTasks +
+            ", warmupTasks=" + warmupTasks +
+            '}';
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java
new file mode 100644
index 00000000000..1a7502b9b52
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Mock implementation of {@link TaskAssignor} that assigns tasks to members 
in a round-robin fashion, with a bit of stickiness.
+ */
+public class MockAssignor implements TaskAssignor {
+
+    public static final String MOCK_ASSIGNOR_NAME = "mock";
+
+    @Override
+    public String name() {
+        return MOCK_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(
+        final GroupSpec groupSpec,
+        final TopologyDescriber topologyDescriber
+    ) throws TaskAssignorException {
+
+        Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();
+        Map<String, String[]> subtopologyToActiveMember = new HashMap<>();
+
+        for (String subtopology : groupSpec.subtopologies()) {
+            int numberOfPartitions = 
topologyDescriber.numPartitions(subtopology);
+            subtopologyToActiveMember.put(subtopology, new 
String[numberOfPartitions]);
+        }
+
+        // Copy existing assignment and fill temporary data structures
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+
+            Map<String, Set<Integer>> activeTasks = new 
HashMap<>(memberSpec.activeTasks());
+
+            newTargetAssignment.put(memberId, new 
MemberAssignment(activeTasks, new HashMap<>(), new HashMap<>()));
+            for (Map.Entry<String, Set<Integer>> entry : 
activeTasks.entrySet()) {
+                final String subtopologyId = entry.getKey();
+                final Set<Integer> taskIds = entry.getValue();
+                final String[] activeMembers = 
subtopologyToActiveMember.get(subtopologyId);
+                for (int taskId : taskIds) {
+                    if (activeMembers[taskId] != null) {
+                        throw new TaskAssignorException(
+                            "Task " + taskId + " of subtopology " + 
subtopologyId + " is assigned to multiple members.");
+                    }
+                    activeMembers[taskId] = memberId;
+                }
+            }
+        }
+
+        // Define priority queue to sort members by task count
+        PriorityQueue<MemberAndTaskCount> memberAndTaskCount = new 
PriorityQueue<>(Comparator.comparingInt(m -> m.taskCount));
+        memberAndTaskCount.addAll(
+            newTargetAssignment.keySet().stream()
+                .map(memberId -> new MemberAndTaskCount(memberId,
+                    
newTargetAssignment.get(memberId).activeTasks().values().stream().mapToInt(Set::size).sum()))
+                .collect(Collectors.toSet())
+        );
+
+        // Assign unassigned tasks to members with fewest tasks
+        for (Map.Entry<String, String[]> entry : 
subtopologyToActiveMember.entrySet()) {
+            final String subtopologyId = entry.getKey();
+            final String[] activeMembers = entry.getValue();
+            for (int i = 0; i < activeMembers.length; i++) {
+                if (activeMembers[i] == null) {
+                    final MemberAndTaskCount m = memberAndTaskCount.poll();
+                    if (m == null) {
+                        throw new TaskAssignorException("No member available 
to assign task " + i + " of subtopology " + subtopologyId);
+                    }
+                    
newTargetAssignment.get(m.memberId).activeTasks().computeIfAbsent(subtopologyId,
 k -> new HashSet<>()).add(i);
+                    activeMembers[i] = m.memberId;
+                    memberAndTaskCount.add(new MemberAndTaskCount(m.memberId, 
m.taskCount + 1));
+                }
+            }
+        }
+
+        return new GroupAssignment(newTargetAssignment);
+    }
+
+    private static class MemberAndTaskCount {
+
+        private final String memberId;
+        private final int taskCount;
+
+        MemberAndTaskCount(String memberId, int taskCount) {
+            this.memberId = memberId;
+            this.taskCount = taskCount;
+        }
+    }
+}
+
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignor.java
new file mode 100644
index 00000000000..c2c88b14236
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Server side task assignor used by the GroupCoordinator.
+ * <p>
+ * The interface is kept in an internal module until KIP-848 is fully 
implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface TaskAssignor {
+
+    /**
+     * Unique name for this assignor.
+     */
+    String name();
+
+    /**
+     * Assigns tasks to group members based on the given assignment 
specification and topic metadata.
+     *
+     * @param groupSpec         The assignment spec which includes member 
metadata.
+     * @param topologyDescriber The task metadata describer.
+     * @return The new assignment for the group.
+     */
+    GroupAssignment assign(
+        GroupSpec groupSpec,
+        TopologyDescriber topologyDescriber
+    ) throws TaskAssignorException;
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignorException.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignorException.java
new file mode 100644
index 00000000000..a3f7edd8a94
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskAssignorException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import org.apache.kafka.common.errors.ApiException;
+
+/**
+ * Exception thrown by {@link TaskAssignor#assign(GroupSpec, 
TopologyDescriber)}}. The exception is only used internally.
+ */
+public class TaskAssignorException extends ApiException {
+
+    public TaskAssignorException(String message) {
+        super(message);
+    }
+
+    public TaskAssignorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java
new file mode 100644
index 00000000000..3b828a60413
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.Objects;
+
+public final class TaskId {
+
+    private final String subtopologyId;
+    private final int partition;
+
+    public TaskId(final String subtopologyId, final int partition) {
+        this.subtopologyId = subtopologyId;
+        this.partition = partition;
+    }
+
+    public String subtopologyId() {
+        return subtopologyId;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final TaskId taskId = (TaskId) o;
+        return partition == taskId.partition && Objects.equals(subtopologyId, 
taskId.subtopologyId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            subtopologyId,
+            partition
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "TaskId{" +
+            "subtopologyId='" + subtopologyId + '\'' +
+            ", partition=" + partition +
+            '}';
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java
new file mode 100644
index 00000000000..f2a28e2cc10
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The subscribed topic describer is used by the {@link TaskAssignor} to 
obtain topic and task metadata of the subscribed topics.
+ * <p>
+ * The interface is kept in an internal module until KIP-848 is fully 
implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface TopologyDescriber {
+
+    /**
+     * The number of partitions for the given subtopology.
+     *
+     * @param subtopologyId String corresponding to the subtopology.
+     * @return The number of tasks corresponding to the given subtopology ID, 
or -1 if the subtopology ID does not exist.
+     */
+    int numPartitions(String subtopologyId);
+
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java
new file mode 100644
index 00000000000..b9c9a8e7aaa
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class GroupSpecImplTest {
+
+    private Map<String, AssignmentMemberSpec> members;
+    private List<String> subtopologies;
+    private GroupSpecImpl groupSpec;
+
+    @BeforeEach
+    void setUp() {
+        members = new HashMap<>();
+        subtopologies = new ArrayList<>();
+
+        members.put("test-member", new AssignmentMemberSpec(
+            Optional.of("test-instance"),
+            Optional.of("test-rack"),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        ));
+
+        subtopologies.add("test-subtopology");
+
+        groupSpec = new GroupSpecImpl(
+            members,
+            subtopologies
+        );
+    }
+
+    @Test
+    void testMembers() {
+        assertEquals(members, groupSpec.members());
+    }
+
+    @Test
+    void testSubtopologies() {
+        assertEquals(subtopologies, groupSpec.subtopologies());
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
new file mode 100644
index 00000000000..bc316db8772
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class MockAssignorTest {
+
+    private final MockAssignor assignor = new MockAssignor();
+
+
+    @Test
+    public void testBasicScenario() {
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                Collections.emptyMap(),
+                Collections.emptyList()
+            ),
+            x -> 5
+        );
+
+        assertEquals(0, result.members().size());
+    }
+
+
+    @Test
+    public void testSingleMember() {
+
+        final AssignmentMemberSpec memberSpec = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                Collections.singletonMap("test_member", memberSpec),
+                Collections.singletonList("test-subtopology")
+            ),
+            x -> 4
+        );
+
+        assertEquals(1, result.members().size());
+        final MemberAssignment testMember = 
result.members().get("test_member");
+        assertNotNull(testMember);
+        assertEquals(mkMap(
+            mkEntry("test-subtopology", mkSet(0, 1, 2, 3))
+        ), testMember.activeTasks());
+    }
+
+
+    @Test
+    public void testTwoMembersTwoSubtopologies() {
+
+        final AssignmentMemberSpec memberSpec1 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final AssignmentMemberSpec memberSpec2 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                mkMap(mkEntry("test_member1", memberSpec1), 
mkEntry("test_member2", memberSpec2)),
+                Arrays.asList("test-subtopology1", "test-subtopology2")
+            ),
+            x -> 4
+        );
+
+        final Map<String, Set<Integer>> expected1 = mkMap(
+            mkEntry("test-subtopology1", mkSet(1, 3)),
+            mkEntry("test-subtopology2", mkSet(1, 3))
+        );
+        final Map<String, Set<Integer>> expected2 = mkMap(
+            mkEntry("test-subtopology1", mkSet(0, 2)),
+            mkEntry("test-subtopology2", mkSet(0, 2))
+        );
+
+        assertEquals(2, result.members().size());
+        final MemberAssignment testMember1 = 
result.members().get("test_member1");
+        final MemberAssignment testMember2 = 
result.members().get("test_member2");
+        assertNotNull(testMember1);
+        assertNotNull(testMember2);
+        assertTrue(expected1.equals(testMember1.activeTasks()) || 
expected2.equals(testMember1.activeTasks()));
+        assertTrue(expected1.equals(testMember2.activeTasks()) || 
expected2.equals(testMember2.activeTasks()));
+    }
+
+    @Test
+    public void testTwoMembersTwoSubtopologiesStickiness() {
+
+        final AssignmentMemberSpec memberSpec1 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            mkMap(
+                mkEntry("test-subtopology1", mkSet(0, 2, 3)),
+                mkEntry("test-subtopology2", mkSet(0))
+            ),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final AssignmentMemberSpec memberSpec2 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            mkMap(
+                mkEntry("test-subtopology1", mkSet(1)),
+                mkEntry("test-subtopology2", mkSet(3))
+            ),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                mkMap(mkEntry("test_member1", memberSpec1), 
mkEntry("test_member2", memberSpec2)),
+                Arrays.asList("test-subtopology1", "test-subtopology2")
+            ),
+            x -> 4
+        );
+
+        assertEquals(2, result.members().size());
+        final MemberAssignment testMember1 = 
result.members().get("test_member1");
+        final MemberAssignment testMember2 = 
result.members().get("test_member2");
+        assertNotNull(testMember1);
+        assertNotNull(testMember2);
+        assertEquals(mkMap(
+            mkEntry("test-subtopology1", mkSet(0, 2, 3)),
+            mkEntry("test-subtopology2", mkSet(0))
+        ), testMember1.activeTasks());
+        assertEquals(mkMap(
+            mkEntry("test-subtopology1", mkSet(1)),
+            mkEntry("test-subtopology2", mkSet(1, 2, 3))
+        ), testMember2.activeTasks());
+    }
+
+}


Reply via email to