dajac commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1363518262
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -19,46 +19,85 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; import java.util.Optional; /** - * Membership manager that maintains group membership for a single member following the new + * Membership manager that maintains group membership for a single member, following the new * consumer group protocol. * <p/> - * This keeps membership state and assignment updated in-memory, based on the heartbeat responses - * the member receives. It is also responsible for computing assignment for the group based on - * the metadata, if the member has been selected by the broker to do so. + * This is responsible for: + * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li> + * <li>Keeping member state as defined in {@link MemberState}.</li> + * <p/> + * Member info and state are updated based on the heartbeat responses the member receives. */ public class MembershipManagerImpl implements MembershipManager { + /** + * Group ID of the consumer group the member will be part of, provided when creating the current + * membership manager. + */ private final String groupId; + + /** + * Group instance ID to be used by the member, provided when creating the current membership manager. + */ private final Optional<String> groupInstanceId; + + /** + * Member ID assigned by the server to the member, received in a heartbeat response when + * joining the group specified in {@link #groupId} + */ private String memberId; + + /** + * Current epoch of the member. It will be set to 0 by the member, and provided to the server + * on the heartbeat request, to join the group. It will be then maintained by the server, + * incremented as the member reconciles and acknowledges the assignments it receives. It will + * be reset to 0 if the member gets fenced. + */ private int memberEpoch; + + /** + * Current state of this member as part of the consumer group, as defined in {@link MemberState} + */ private MemberState state; + + /** + * Assignor selection configured for the member, that will be sent out to the server on the + * {@link ConsumerGroupHeartbeatRequest}. This will default to using server-side assignor, + * letting the server choose the specific assignor implementation to use. + */ private AssignorSelection assignorSelection; /** * Assignment that the member received from the server and successfully processed. */ private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + /** * Assignment that the member received from the server but hasn't completely processed * yet. */ private Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment; + /** - * Latest assignment that the member received from the server while a {@link #targetAssignment} - * was in process. + * Logger. */ - private Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment; + private final Logger log; - public MembershipManagerImpl(String groupId) { - this(groupId, null, null); + public MembershipManagerImpl(String groupId, LogContext logContext) { + this(groupId, null, null, logContext); } - public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) { + public MembershipManagerImpl(String groupId, + String groupInstanceId, + AssignorSelection assignorSelection, Review Comment: We will only support server-side assignors for now. The config that we need to add is a nullable string. When null, the server selects the assignor. Otherwise, we pass the name in the HB request as you said. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org