Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361271426


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assignment being processed.
  */
-public void onAssignmentProcessFailure(Throwable error) {
-transitionTo(MemberState.FAILED);
-// TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-//  this unrecoverable state
+private boolean hasPendingTargetAssignment() {
+return targetAssignment.isPresent();
 }
 
 private void resetEpoch() {
 this.memberEpoch = 0;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public MemberState state() {
 return state;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public AssignorSelection assignorSelection() {
 return this.assignorSelection;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
 return this.currentAssignment;
 }
 
+
+/**
+ * Assignment that the member received from the server but hasn't 
completely processed yet.
+ */
 // VisibleForTesting
 Optional targetAssignment() 
{
 return targetAssignment;
 }
 
-// VisibleForTesting
-Optional 
nextTargetAssignment() {
-return nextTargetAssignment;
-}
-
 /**
- * Set the current assignment for the member. This indicates that the 
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has 
been successfully
+ * completed, so it will make it effective by assigning it to the current 
assignment.
  *
- * @param assignment Assignment that has been successfully processed as 
part of the
- *   reconciliation process.
+ * @params Assignment that has been successfully reconciled. This is 
expected to
+ * match the target assignment defined in {@link #targetAssignment()}
  */
 @Override
-public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-this.currentAssignment = assignment;
-if (!nextTargetAssignment.isPresent()) {
-targetAssignment = Optional.empty();
-} else {
-targetAssignment = Optional.of(nextTargetAssignment.get());
-nextTargetAssignment = Optio

Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361286203


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,84 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * ID of the consumer group the member will be part of, provided when 
creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives.

Review Comment:
   Do you mean on fencing errors? We do reset to 0 when member gets fenced (and 
it is indeed not explained in the doc). 
   On fatal errors we are currently just leaving the epoch unchanged thinking 
it won't be used anymore (the member transitions to an unrecoverable FAILED 
state where it will not re-join the group). To be reconsidered now with the 
OffsetFetch/Commit v9 in sight I guess...we wouldn't want a FAILED member to 
include an old epoch in an OffsetFetch/Commit request right? Just perform the 
OffsetFetch/Commit without epoch I guess?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361289406


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361299217


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);

Review Comment:
   Yes, sounds sensible to me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361271426


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assignment being processed.
  */
-public void onAssignmentProcessFailure(Throwable error) {
-transitionTo(MemberState.FAILED);
-// TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-//  this unrecoverable state
+private boolean hasPendingTargetAssignment() {
+return targetAssignment.isPresent();
 }
 
 private void resetEpoch() {
 this.memberEpoch = 0;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public MemberState state() {
 return state;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public AssignorSelection assignorSelection() {
 return this.assignorSelection;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
 return this.currentAssignment;
 }
 
+
+/**
+ * Assignment that the member received from the server but hasn't 
completely processed yet.
+ */
 // VisibleForTesting
 Optional targetAssignment() 
{
 return targetAssignment;
 }
 
-// VisibleForTesting
-Optional 
nextTargetAssignment() {
-return nextTargetAssignment;
-}
-
 /**
- * Set the current assignment for the member. This indicates that the 
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has 
been successfully
+ * completed, so it will make it effective by assigning it to the current 
assignment.
  *
- * @param assignment Assignment that has been successfully processed as 
part of the
- *   reconciliation process.
+ * @params Assignment that has been successfully reconciled. This is 
expected to
+ * match the target assignment defined in {@link #targetAssignment()}
  */
 @Override
-public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-this.currentAssignment = assignment;
-if (!nextTargetAssignment.isPresent()) {
-targetAssignment = Optional.empty();
-} else {
-targetAssignment = Optional.of(nextTargetAssignment.get());
-nextTargetAssignment = Optio

Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361393280


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -21,53 +21,76 @@
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * Manages membership of a single member to a consumer group.
+ * 
  * Responsible for:
  * Keeping member state
  * Keeping assignment for the member
  * Computing assignment for the group if the member is required to do 
so
  */
 public interface MembershipManager {
 
+/**
+ * ID of the consumer group the member is part of (or wants to be part of).

Review Comment:
   Agree on both, all done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361271426


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assignment being processed.
  */
-public void onAssignmentProcessFailure(Throwable error) {
-transitionTo(MemberState.FAILED);
-// TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-//  this unrecoverable state
+private boolean hasPendingTargetAssignment() {
+return targetAssignment.isPresent();
 }
 
 private void resetEpoch() {
 this.memberEpoch = 0;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public MemberState state() {
 return state;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public AssignorSelection assignorSelection() {
 return this.assignorSelection;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
 return this.currentAssignment;
 }
 
+
+/**
+ * Assignment that the member received from the server but hasn't 
completely processed yet.
+ */
 // VisibleForTesting
 Optional targetAssignment() 
{
 return targetAssignment;
 }
 
-// VisibleForTesting
-Optional 
nextTargetAssignment() {
-return nextTargetAssignment;
-}
-
 /**
- * Set the current assignment for the member. This indicates that the 
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has 
been successfully
+ * completed, so it will make it effective by assigning it to the current 
assignment.
  *
- * @param assignment Assignment that has been successfully processed as 
part of the
- *   reconciliation process.
+ * @params Assignment that has been successfully reconciled. This is 
expected to
+ * match the target assignment defined in {@link #targetAssignment()}
  */
 @Override
-public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-this.currentAssignment = assignment;
-if (!nextTargetAssignment.isPresent()) {
-targetAssignment = Optional.empty();
-} else {
-targetAssignment = Optional.of(nextTargetAssignment.get());
-nextTargetAssignment = Optio

Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361404673


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assignment being processed.
  */
-public void onAssignmentProcessFailure(Throwable error) {
-transitionTo(MemberState.FAILED);
-// TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-//  this unrecoverable state
+private boolean hasPendingTargetAssignment() {
+return targetAssignment.isPresent();
 }
 
 private void resetEpoch() {
 this.memberEpoch = 0;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public MemberState state() {
 return state;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public AssignorSelection assignorSelection() {
 return this.assignorSelection;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
 return this.currentAssignment;
 }
 
+
+/**
+ * Assignment that the member received from the server but hasn't 
completely processed yet.
+ */
 // VisibleForTesting
 Optional targetAssignment() 
{
 return targetAssignment;
 }
 
-// VisibleForTesting
-Optional 
nextTargetAssignment() {
-return nextTargetAssignment;
-}
-
 /**
- * Set the current assignment for the member. This indicates that the 
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has 
been successfully

Review Comment:
   When and how this logic is invoked is changing in the [reconciler PR 
](https://github.com/apache/kafka/pull/14357) after our sync for reviewing how 
the interactions between HB-reconciler-manager, so I would suggest we wait 
until we settle on that PR and adjust there according to the changes, makes 
sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361271426


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assignment being processed.
  */
-public void onAssignmentProcessFailure(Throwable error) {
-transitionTo(MemberState.FAILED);
-// TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-//  this unrecoverable state
+private boolean hasPendingTargetAssignment() {
+return targetAssignment.isPresent();
 }
 
 private void resetEpoch() {
 this.memberEpoch = 0;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public MemberState state() {
 return state;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public AssignorSelection assignorSelection() {
 return this.assignorSelection;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
 return this.currentAssignment;
 }
 
+
+/**
+ * Assignment that the member received from the server but hasn't 
completely processed yet.
+ */
 // VisibleForTesting
 Optional targetAssignment() 
{
 return targetAssignment;
 }
 
-// VisibleForTesting
-Optional 
nextTargetAssignment() {
-return nextTargetAssignment;
-}
-
 /**
- * Set the current assignment for the member. This indicates that the 
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has 
been successfully
+ * completed, so it will make it effective by assigning it to the current 
assignment.
  *
- * @param assignment Assignment that has been successfully processed as 
part of the
- *   reconciliation process.
+ * @params Assignment that has been successfully reconciled. This is 
expected to
+ * match the target assignment defined in {@link #targetAssignment()}
  */
 @Override
-public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-this.currentAssignment = assignment;
-if (!nextTargetAssignment.isPresent()) {
-targetAssignment = Optional.empty();
-} else {
-targetAssignment = Optional.of(nextTargetAssignment.get());
-nextTargetAssignment = Optio

Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361411537


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -21,53 +21,76 @@
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * Manages membership of a single member to a consumer group.
+ * 
  * Responsible for:
  * Keeping member state
  * Keeping assignment for the member
  * Computing assignment for the group if the member is required to do 
so
  */
 public interface MembershipManager {
 
+/**
+ * ID of the consumer group the member is part of (or wants to be part of).
+ */
 String groupId();
 
+/**
+ * Instance ID used by the member when joining the group. If non-empty, it 
will indicate that
+ * this is a static member.
+ */
 Optional groupInstanceId();
 
+/**
+ * Member ID assigned by the server to this member when it joins the 
consumer group.
+ */
 String memberId();
 
+/**
+ * Current epoch of the member, maintained by the server.
+ */
 int memberEpoch();
 
+/**
+ * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}.

Review Comment:
   Done. Just for the record, the idea of the membershipManager supporting a 
NOT_IN_GROUP state does not exist here, but it is introduced in the other 
[PR](https://github.com/apache/kafka/pull/14390) still under review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


lianetm commented on PR #14413:
URL: https://github.com/apache/kafka/pull/14413#issuecomment-1765516635

   Thanks for the comments @philipnee and @dajac, all addressed. Just 
[this](https://github.com/apache/kafka/pull/14413#discussion_r1361286203) 
comment where I would need some clarification @dajac , thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-17 Thread via GitHub


dajac commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1362082878


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -68,14 +107,16 @@ public MembershipManagerImpl(String groupId, String 
groupInstanceId, AssignorSel
 }
 this.groupInstanceId = Optional.ofNullable(groupInstanceId);
 this.targetAssignment = Optional.empty();
-this.nextTargetAssignment = Optional.empty();
+this.log = logContext.logger(MembershipManagerImpl.class);
 }
 
 /**
  * Update assignor selection for the member.
  *
- * @param assignorSelection New assignor selection
- * @throws IllegalArgumentException If the provided assignor selection is 
null
+ * @param assignorSelection New assignor selection. If empty is provided, 
this will
+ *  effectively clear the previous assignor 
selection defined for the
+ *  member.
+ * @throws IllegalArgumentException If the provided optional assignor 
selection is null.
  */
 public final void setAssignorSelection(AssignorSelection 
assignorSelection) {

Review Comment:
   btw, we can't change the assignor while the consumer runs. We could probably 
remove this method or make it private.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,85 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * Group ID of the consumer group the member will be part of, provided 
when creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor selection configured for the member, that will be sent out to 
the server on the
+ * {@link ConsumerGroupHeartbeatRequest}. This will default to using 
server-side assignor,
+ * letting the server choose the specific assignor implementation to use.
+ */
 private AssignorSelection assignorSelection;
 
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
 private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
 /**
  * Assignment that the member received from the server but hasn't 
completely processed
  * yet.
  */
 private Optional 
targetAssignment;
+
 /**
- * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
- * was in process.
+ * Logger.
  */
-private Optional 
nextTargetAssignment;
+private final Logger log;
 
-public MembershipManagerImpl(String groupId) {
-this(groupId, null, null);
+public MembershipManagerImpl(String groupId, LogContext logContext) {
+this(groupId, null, null, logContext);
 }
 
-public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection as

Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-17 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1362149712


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -68,14 +107,16 @@ public MembershipManagerImpl(String groupId, String 
groupInstanceId, AssignorSel
 }
 this.groupInstanceId = Optional.ofNullable(groupInstanceId);
 this.targetAssignment = Optional.empty();
-this.nextTargetAssignment = Optional.empty();
+this.log = logContext.logger(MembershipManagerImpl.class);
 }
 
 /**
  * Update assignor selection for the member.
  *
- * @param assignorSelection New assignor selection
- * @throws IllegalArgumentException If the provided assignor selection is 
null
+ * @param assignorSelection New assignor selection. If empty is provided, 
this will
+ *  effectively clear the previous assignor 
selection defined for the
+ *  member.
+ * @throws IllegalArgumentException If the provided optional assignor 
selection is null.
  */
 public final void setAssignorSelection(AssignorSelection 
assignorSelection) {

Review Comment:
   Good point, this is only defined when the consumer is created. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-17 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1362163867


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,85 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * Group ID of the consumer group the member will be part of, provided 
when creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor selection configured for the member, that will be sent out to 
the server on the
+ * {@link ConsumerGroupHeartbeatRequest}. This will default to using 
server-side assignor,
+ * letting the server choose the specific assignor implementation to use.
+ */
 private AssignorSelection assignorSelection;
 
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
 private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
 /**
  * Assignment that the member received from the server but hasn't 
completely processed
  * yet.
  */
 private Optional 
targetAssignment;
+
 /**
- * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
- * was in process.
+ * Logger.
  */
-private Optional 
nextTargetAssignment;
+private final Logger log;
 
-public MembershipManagerImpl(String groupId) {
-this(groupId, null, null);
+public MembershipManagerImpl(String groupId, LogContext logContext) {
+this(groupId, null, null, logContext);
 }
 
-public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ AssignorSelection assignorSelection,

Review Comment:
   Interesting simplification, makes sense to me. So it's fair to say that from 
the consumer point of view, there could be really only one Optional 
`clientAssignor`. That will have a class name of the client assignor to use or 
empty (empty meaning that a server-side assignor will be used, and the server 
will choose which one), correct? Sounds good to me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-17 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1362184481


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,85 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * Group ID of the consumer group the member will be part of, provided 
when creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor selection configured for the member, that will be sent out to 
the server on the
+ * {@link ConsumerGroupHeartbeatRequest}. This will default to using 
server-side assignor,
+ * letting the server choose the specific assignor implementation to use.
+ */
 private AssignorSelection assignorSelection;
 
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
 private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
 /**
  * Assignment that the member received from the server but hasn't 
completely processed
  * yet.
  */
 private Optional 
targetAssignment;
+
 /**
- * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
- * was in process.
+ * Logger.
  */
-private Optional 
nextTargetAssignment;
+private final Logger log;
 
-public MembershipManagerImpl(String groupId) {
-this(groupId, null, null);
+public MembershipManagerImpl(String groupId, LogContext logContext) {
+this(groupId, null, null, logContext);
 }
 
-public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ AssignorSelection assignorSelection,

Review Comment:
   Just to double check, this could also include the name of a server-side 
assignor to use, specified by the consumer, right? (to be provided in the HB 
request on the `server_assignor` field).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-17 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1362163867


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,85 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * Group ID of the consumer group the member will be part of, provided 
when creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor selection configured for the member, that will be sent out to 
the server on the
+ * {@link ConsumerGroupHeartbeatRequest}. This will default to using 
server-side assignor,
+ * letting the server choose the specific assignor implementation to use.
+ */
 private AssignorSelection assignorSelection;
 
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
 private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
 /**
  * Assignment that the member received from the server but hasn't 
completely processed
  * yet.
  */
 private Optional 
targetAssignment;
+
 /**
- * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
- * was in process.
+ * Logger.
  */
-private Optional 
nextTargetAssignment;
+private final Logger log;
 
-public MembershipManagerImpl(String groupId) {
-this(groupId, null, null);
+public MembershipManagerImpl(String groupId, LogContext logContext) {
+this(groupId, null, null, logContext);
 }
 
-public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ AssignorSelection assignorSelection,

Review Comment:
   Interesting simplification, makes sense to me. So it's fair to say that from 
the consumer point of view, there could be really only one Optional 
`clientAssignor`. That will have a class name of the assignor to use or empty 
(empty meaning that a server-side assignor will be used, and the server will 
choose which one), correct? Sounds good to me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-17 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1362184481


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,85 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * Group ID of the consumer group the member will be part of, provided 
when creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor selection configured for the member, that will be sent out to 
the server on the
+ * {@link ConsumerGroupHeartbeatRequest}. This will default to using 
server-side assignor,
+ * letting the server choose the specific assignor implementation to use.
+ */
 private AssignorSelection assignorSelection;
 
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
 private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
 /**
  * Assignment that the member received from the server but hasn't 
completely processed
  * yet.
  */
 private Optional 
targetAssignment;
+
 /**
- * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
- * was in process.
+ * Logger.
  */
-private Optional 
nextTargetAssignment;
+private final Logger log;
 
-public MembershipManagerImpl(String groupId) {
-this(groupId, null, null);
+public MembershipManagerImpl(String groupId, LogContext logContext) {
+this(groupId, null, null, logContext);
 }
 
-public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ AssignorSelection assignorSelection,

Review Comment:
   Just to double check, for now this could only include the name of a 
server-side assignor to use, specified by the consumer, right? (to be provided 
in the HB request on the `server_assignor` field).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-17 Thread via GitHub


lianetm commented on PR #14413:
URL: https://github.com/apache/kafka/pull/14413#issuecomment-1766588965

   Thanks for the comments @dajac , all addressed. I simplified the assignor 
selection following your suggestion, keeping only the `serverAssignor` 
selection for now, as the current `ConsumerGroupHeartbeatRequest` supports 
(we'll just follow the evolution of the request, and include the 
clientAssignors when the time comes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-18 Thread via GitHub


dajac commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1363518262


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,85 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * Group ID of the consumer group the member will be part of, provided 
when creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member as part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor selection configured for the member, that will be sent out to 
the server on the
+ * {@link ConsumerGroupHeartbeatRequest}. This will default to using 
server-side assignor,
+ * letting the server choose the specific assignor implementation to use.
+ */
 private AssignorSelection assignorSelection;
 
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
 private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
 /**
  * Assignment that the member received from the server but hasn't 
completely processed
  * yet.
  */
 private Optional 
targetAssignment;
+
 /**
- * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
- * was in process.
+ * Logger.
  */
-private Optional 
nextTargetAssignment;
+private final Logger log;
 
-public MembershipManagerImpl(String groupId) {
-this(groupId, null, null);
+public MembershipManagerImpl(String groupId, LogContext logContext) {
+this(groupId, null, null, logContext);
 }
 
-public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ AssignorSelection assignorSelection,

Review Comment:
   We will only support server-side assignors for now. The config that we need 
to add is a nullable string. When null, the server selects the assignor. 
Otherwise, we pass the name in the HB request as you said.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-18 Thread via GitHub


dajac merged PR #14413:
URL: https://github.com/apache/kafka/pull/14413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org