[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185475014


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185460028


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185172881


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185168753


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:
+ *   This is the initial state of the state machine. The state machine starts 
here when the next epoch
+ *   does not match the target epoch. It means that a new target assignment 
has been installed so the
+ *   reconciliation process must restart. In this state, the Assigned, 
Revoking and Assigning sets are
+ *   computed. If Revoking is not empty, the state machine transitions to 
REVOKING; if Assigning is not
+ *   empty, it transitions to ASSIGNING; otherwise it transitions to STABLE.
+ *
+ * - REVOKING:
+ *   This state means that the member must revoke partitions before it can 
transition to the next epoch
+ *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
+ *   are committed with the current epoch. The member transitions to the next 
state only when it has
+ *   acknowledged the revocation.
+ *
+ * - ASSIGNING:
+ *   This state means that the member waits on partitions which are still 
owned by other members in the
+ *   group. It remains in this state until they are all freed up.
+ *
+ * - STABLE:
+ *   This state means that the member has received all its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ * Constructs the CurrentAssignmentBuilder based on the current state of 
the
+ * provided consumer group member.
+ *
+ * @param member The consumer group member that must be reconciled.
+ */
+public 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-03 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1184358012


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:

Review Comment:
   maybe we can link the ConsumerGroupMember.MemberState docs to this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned