[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185475014 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185460028 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185172881 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/**
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185168753 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ +public
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1184358012 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: Review Comment: maybe we can link the ConsumerGroupMember.MemberState docs to this? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned