Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-16 Thread via GitHub


dajac commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1360767846


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,46 +19,84 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * ID of the consumer group the member will be part of, provided when 
creating the current

Review Comment:
   nit: Group ID...



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -21,53 +21,76 @@
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * Manages membership of a single member to a consumer group.
+ * 
  * Responsible for:
  * Keeping member state
  * Keeping assignment for the member
  * Computing assignment for the group if the member is required to do 
so
  */
 public interface MembershipManager {
 
+/**
+ * ID of the consumer group the member is part of (or wants to be part of).

Review Comment:
   nit: `Group ID...`? Should we also prefix all those accessors with `@return`?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -196,4 +281,4 @@ private ConsumerGroupHeartbeatResponseData.Assignment 
createAssignment() {
 .setPartitions(Arrays.asList(3, 4, 5))
 ));
 }
-}
+}

Review Comment:
   nit: Let's add an empty line at the end of the file.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assign

Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1353024844


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+" exists.");
 }
 }
 
-private boolean hasPendingTargetAssignment() {
-return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-}
-
-
-/**
- * Update state and assignment as the member has successfully processed a 
new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and 
makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully 
process
- */
-public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-updateAssignment(assignment);
-transitionTo(MemberState.STABLE);
-}
-
 /**
- * Update state and member info as the member was not able to process the 
assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided 
callbacks
+ * Returns true if the member has a target assignment being processed.
  */
-public void onAssignmentProcessFailure(Throwable error) {
-transitionTo(MemberState.FAILED);
-// TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-//  this unrecoverable state
+private boolean hasPendingTargetAssignment() {
+return targetAssignment.isPresent();
 }
 
 private void resetEpoch() {
 this.memberEpoch = 0;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public MemberState state() {
 return state;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
 public AssignorSelection assignorSelection() {
 return this.assignorSelection;
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
 return this.currentAssignment;
 }
 
+
+/**
+ * Assignment that the member received from the server but hasn't 
completely processed yet.
+ */
 // VisibleForTesting
 Optional targetAssignment() 
{
 return targetAssignment;
 }
 
-// VisibleForTesting
-Optional 
nextTargetAssignment() {
-return nextTargetAssignment;
-}
-
 /**
- * Set the current assignment for the member. This indicates that the 
reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has 
been successfully

Review Comment:
   instead of "this indicates" can we say: "this is invoked when is 
successfully completed"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1353015059


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
 return state.equals(MemberState.STABLE);
 }
 
+/**
+ * Take new target assignment received from the server and set it as 
targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send 
a new target
+ * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
 private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
 if (!targetAssignment.isPresent()) {
+log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
 targetAssignment = Optional.of(newTargetAssignment);
 } else {
-// Keep the latest next target assignment
-nextTargetAssignment = Optional.of(newTargetAssignment);
+transitionToFailed();
+throw new IllegalStateException("A target assignment pending to be 
reconciled already" +

Review Comment:
   Can we say: "Unable to set target assignment because ... " the help user 
understanding the cause?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1352882295


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -21,53 +21,76 @@
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * Manages membership of a single member to a consumer group.
+ * 
  * Responsible for:
  * Keeping member state
  * Keeping assignment for the member
  * Computing assignment for the group if the member is required to do 
so
  */
 public interface MembershipManager {
 
+/**
+ * ID of the consumer group the member is part of (or wants to be part of).
+ */
 String groupId();
 
+/**
+ * Instance ID used by the member when joining the group. If non-empty, it 
will indicate that
+ * this is a static member.
+ */
 Optional groupInstanceId();
 
+/**
+ * Member ID assigned by the server to this member when it joins the 
consumer group.
+ */
 String memberId();
 
+/**
+ * Current epoch of the member, maintained by the server.
+ */
 int memberEpoch();
 
+/**
+ * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}.

Review Comment:
   Maybe "the current state of the consumer" because it might not be in a group 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


philipnee commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1352876691


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -21,53 +21,76 @@
 import java.util.Optional;
 
 /**
- * Manages group membership for a single member.
+ * Manages membership of a single member to a consumer group.

Review Comment:
   Can we say something like "A stateful object tracking the state of the 
consumer including: "? Membership might mean a lot of different things for 
different people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1352875373


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * 
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ * 
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+/**
+ * ID of the consumer group the member will be part of., provided when 
creating the current
+ * membership manager.
+ */
 private final String groupId;
+
+/**
+ * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+ */
 private final Optional groupInstanceId;
+
+/**
+ * Member ID assigned by the server to the member, received in a heartbeat 
response when
+ * joining the group specified in {@link #groupId}
+ */
 private String memberId;
+
+/**
+ * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+ * incremented as the member reconciles and acknowledges the assignments 
it receives.
+ */
 private int memberEpoch;
+
+/**
+ * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}
+ */
 private MemberState state;
+
+/**
+ * Assignor type selection for the member. If non-null, the member will 
send its selection to
+ * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the 
server will select a
+ * default assignor for the member, which the member does not need to 
track.
+ */
 private AssignorSelection assignorSelection;

Review Comment:
   As described in the field doc, this changed to always having a selection, 
that will default to using server-side assignor, and let the server choose the 
specific implementation to use. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


lianetm commented on PR #14413:
URL: https://github.com/apache/kafka/pull/14413#issuecomment-1755659746

   Hey @dajac , this ready for review now, including trunk latest changes. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]

2023-10-10 Thread via GitHub


lianetm closed pull request #14413: KAFKA-14323: Client state changes for 
handling one assignment at a time & minor improvements
URL: https://github.com/apache/kafka/pull/14413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org