dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185193708


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:
+ *   This is the initial state of the state machine. The state machine starts 
here when the next epoch
+ *   does not match the target epoch. It means that a new target assignment 
has been installed so the
+ *   reconciliation process must restart. In this state, the Assigned, 
Revoking and Assigning sets are
+ *   computed. If Revoking is not empty, the state machine transitions to 
REVOKING; if Assigning is not
+ *   empty, it transitions to ASSIGNING; otherwise it transitions to STABLE.
+ *
+ * - REVOKING:
+ *   This state means that the member must revoke partitions before it can 
transition to the next epoch
+ *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
+ *   are committed with the current epoch. The member transitions to the next 
state only when it has
+ *   acknowledged the revocation.
+ *
+ * - ASSIGNING:
+ *   This state means that the member waits on partitions which are still 
owned by other members in the
+ *   group. It remains in this state until they are all freed up.
+ *
+ * - STABLE:
+ *   This state means that the member has received all its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+    /**
+     * The consumer group member which is reconciled.
+     */
+    private final ConsumerGroupMember member;
+
+    /**
+     * The target assignment epoch.
+     */
+    private int targetAssignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private Assignment targetAssignment;
+
+    /**
+     * A function which returns the current epoch of a topic-partition or -1 
if the
+     * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+     */
+    private BiFunction<Uuid, Integer, Integer> currentPartitionEpoch;
+
+    /**
+     * The partitions owned by the consumer. This is directly provided by the 
member in the
+     * ConsumerGroupHeartbeat request.
+     */
+    private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions;
+
+    /**
+     * Constructs the CurrentAssignmentBuilder based on the current state of 
the
+     * provided consumer group member.
+     *
+     * @param member The consumer group member that must be reconciled.
+     */
+    public CurrentAssignmentBuilder(ConsumerGroupMember member) {
+        this.member = Objects.requireNonNull(member);
+    }
+
+    /**
+     * Sets the target assignment epoch and the target assignment that the
+     * consumer group member must be reconciled to.
+     *
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withTargetAssignment(
+        int targetAssignmentEpoch,
+        Assignment targetAssignment
+    ) {
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
+        this.targetAssignment = Objects.requireNonNull(targetAssignment);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current epoch of a
+     * partition. This is used by the state machine to determine if a
+     * partition is free or still used by another member.
+     *
+     * @param currentPartitionEpoch A BiFunction which gets the epoch of a
+     *                              topic id / partitions id pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentPartitionEpoch(
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch
+    ) {
+        this.currentPartitionEpoch = 
Objects.requireNonNull(currentPartitionEpoch);
+        return this;
+    }
+
+    /**
+     * Sets the partitions currently owned by the member. This comes directly
+     * from the last ConsumerGroupHeartbeat request. This is used to determine
+     * if the member has revoked the necessary partitions.
+     *
+     * @param ownedTopicPartitions A list of topic-partitions.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withOwnedTopicPartitions(
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions
+    ) {
+        this.ownedTopicPartitions = ownedTopicPartitions;
+        return this;
+    }
+
+    /**
+     * Builds the next state for the member or keep the current one if it
+     * is not possible to move forward with the current state.
+     *
+     * @return A new ConsumerGroupMember or the current one.
+     */
+    public ConsumerGroupMember build() {
+        // A new target assignment has been installed, we need to restart
+        // the reconciliation loop from the beginning.
+        if (targetAssignmentEpoch != member.nextMemberEpoch()) {
+            return transitionToInitialState();
+        }
+
+        switch (member.state()) {
+            // Check if the partitions have been revoked by the member.
+            case REVOKING:
+                return maybeTransitionFromRevokingToAssigningOrStable();
+
+            // Check if pending partitions have been freed up.
+            case ASSIGNING:
+                return maybeTransitionFromAssigningToAssigningOrStable();
+
+            // Nothing to do.
+            case STABLE:
+                return member;
+        }
+
+        return member;
+    }
+
+    /**
+     * Transitions to the initial state. Here we compute the Assigned,
+     * Revoking and Assigning sets.
+     *
+     * @return A new ConsumerGroupMember.
+     */
+    private ConsumerGroupMember transitionToInitialState() {

Review Comment:
   Thinking a little more about it, I think that `transitionToInitialState` was 
not too bad after all. In the main javadoc, we clearly define the initial state 
and explain what it does. I agree that it is a transient state but keeping the 
name aligned with the doc makes sense.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:
+ *   This is the initial state of the state machine. The state machine starts 
here when the next epoch
+ *   does not match the target epoch. It means that a new target assignment 
has been installed so the
+ *   reconciliation process must restart. In this state, the Assigned, 
Revoking and Assigning sets are
+ *   computed. If Revoking is not empty, the state machine transitions to 
REVOKING; if Assigning is not
+ *   empty, it transitions to ASSIGNING; otherwise it transitions to STABLE.
+ *
+ * - REVOKING:
+ *   This state means that the member must revoke partitions before it can 
transition to the next epoch
+ *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
+ *   are committed with the current epoch. The member transitions to the next 
state only when it has
+ *   acknowledged the revocation.
+ *
+ * - ASSIGNING:
+ *   This state means that the member waits on partitions which are still 
owned by other members in the
+ *   group. It remains in this state until they are all freed up.
+ *
+ * - STABLE:
+ *   This state means that the member has received all its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+    /**
+     * The consumer group member which is reconciled.
+     */
+    private final ConsumerGroupMember member;
+
+    /**
+     * The target assignment epoch.
+     */
+    private int targetAssignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private Assignment targetAssignment;
+
+    /**
+     * A function which returns the current epoch of a topic-partition or -1 
if the
+     * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+     */
+    private BiFunction<Uuid, Integer, Integer> currentPartitionEpoch;
+
+    /**
+     * The partitions owned by the consumer. This is directly provided by the 
member in the
+     * ConsumerGroupHeartbeat request.
+     */
+    private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions;
+
+    /**
+     * Constructs the CurrentAssignmentBuilder based on the current state of 
the
+     * provided consumer group member.
+     *
+     * @param member The consumer group member that must be reconciled.
+     */
+    public CurrentAssignmentBuilder(ConsumerGroupMember member) {
+        this.member = Objects.requireNonNull(member);
+    }
+
+    /**
+     * Sets the target assignment epoch and the target assignment that the
+     * consumer group member must be reconciled to.
+     *
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withTargetAssignment(
+        int targetAssignmentEpoch,
+        Assignment targetAssignment
+    ) {
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
+        this.targetAssignment = Objects.requireNonNull(targetAssignment);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current epoch of a
+     * partition. This is used by the state machine to determine if a
+     * partition is free or still used by another member.
+     *
+     * @param currentPartitionEpoch A BiFunction which gets the epoch of a
+     *                              topic id / partitions id pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentPartitionEpoch(
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch
+    ) {
+        this.currentPartitionEpoch = 
Objects.requireNonNull(currentPartitionEpoch);
+        return this;
+    }
+
+    /**
+     * Sets the partitions currently owned by the member. This comes directly
+     * from the last ConsumerGroupHeartbeat request. This is used to determine
+     * if the member has revoked the necessary partitions.
+     *
+     * @param ownedTopicPartitions A list of topic-partitions.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withOwnedTopicPartitions(
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions
+    ) {
+        this.ownedTopicPartitions = ownedTopicPartitions;
+        return this;
+    }
+
+    /**
+     * Builds the next state for the member or keep the current one if it
+     * is not possible to move forward with the current state.
+     *
+     * @return A new ConsumerGroupMember or the current one.
+     */
+    public ConsumerGroupMember build() {
+        // A new target assignment has been installed, we need to restart
+        // the reconciliation loop from the beginning.
+        if (targetAssignmentEpoch != member.nextMemberEpoch()) {
+            return transitionToInitialState();
+        }
+
+        switch (member.state()) {
+            // Check if the partitions have been revoked by the member.
+            case REVOKING:
+                return maybeTransitionFromRevokingToAssigningOrStable();
+
+            // Check if pending partitions have been freed up.
+            case ASSIGNING:
+                return maybeTransitionFromAssigningToAssigningOrStable();
+
+            // Nothing to do.
+            case STABLE:
+                return member;
+        }
+
+        return member;
+    }
+
+    /**
+     * Transitions to the initial state. Here we compute the Assigned,
+     * Revoking and Assigning sets.
+     *
+     * @return A new ConsumerGroupMember.
+     */
+    private ConsumerGroupMember transitionToInitialState() {

Review Comment:
   Thinking a little more about it, I think that `transitionToInitialState` was 
not too bad after all. In the main javadoc, we clearly define the initial state 
and explain what it does. I agree that it is a transient state but keeping the 
name aligned with the doc makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to