[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1187624902 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The epoch of the member when the state was last updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: Review Comment: Updated the comment. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1186823970 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The epoch of the member when the state was last updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: Review Comment: We do but it is a transient state so it is not part of the member's states. We transition to it [here](https://github.com/apache/kafka/pull/13638/files/0395a1780acd73e735bfee21da733075db85504d#diff-6c5e23803064f4b7a122eee29736d036b9cfe244e69f48751ba163d62e2bf35fR176). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185752497 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185749017 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185204551 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185195017 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ +public
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185193708 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ +public
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185188619 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1184677397 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(11, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1184669415 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(11, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r118484 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(11, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1184655502 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ +public
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1184654172 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1184651164 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1184644685 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183933777 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. Review Comment: Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183933309 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282744 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282261 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. Review Comment: That's correct. Previous and next epoch could have bigger deltas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183282063 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183280551 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183279847 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183278628 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still Review Comment: The coordinator is actually assigning partitions to the member in this state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183278201 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what Review Comment: This is the set of partitions currently assigned to or owned by the member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1183277863 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transition to this epoch + *when it has revoked the partitions that it does not owned or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it could transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions Review Comment: Renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1179091537 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transition to this epoch + *when it has revoked the partitions that it does not owned or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it could transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions Review Comment: It contains only the partitions that are pending -- still owned by other members. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org