kirktrue commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1332201750


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * ID of the consumer group the member will be part of., provided when 
creating the current
+     * membership manager.
+     */
     private final String groupId;
+
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
     private final Optional<String> groupInstanceId;
+
+    /**
+     * Member ID assigned by the server to the member, received in a heartbeat 
response when
+     * joining the group specified in {@link #groupId}
+     */
     private String memberId;
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives.
+     */
     private int memberEpoch;
+
+    /**
+     * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}
+     */
     private MemberState state;
+
+    /**
+     * Assignor type selection for the member. If non-null, the member will 
send its selection to
+     * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the 
server will select a
+     * default assignor for the member, which the member does not need to 
track.
+     */
     private AssignorSelection assignorSelection;

Review Comment:
   I don't know where we stand on the `Optional` vs. non-`Optional` debate 
instead of `null`s.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -149,92 +212,84 @@ private boolean maybeTransitionToStable() {
         return state.equals(MemberState.STABLE);
     }
 
+    /**
+     * Take new target assignment received from the server and set it as 
targetAssignment to be
+     * processed. If an assignment is already in process this 
newTargetAssignment will be ignored
+     * for now.
+     */
     private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
         if (!targetAssignment.isPresent()) {
             targetAssignment = Optional.of(newTargetAssignment);
         } else {
-            // Keep the latest next target assignment
-            nextTargetAssignment = Optional.of(newTargetAssignment);
+            log.debug(String.format("Temporarily ignoring assignment %s 
received while member %s " +

Review Comment:
   We don't need the `String.format` here, do we? We should be able to rely on 
the placeholder syntax in the `log.debug` directly.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * ID of the consumer group the member will be part of., provided when 
creating the current
+     * membership manager.
+     */
     private final String groupId;
+
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
     private final Optional<String> groupInstanceId;
+
+    /**
+     * Member ID assigned by the server to the member, received in a heartbeat 
response when
+     * joining the group specified in {@link #groupId}
+     */
     private String memberId;
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives.
+     */
     private int memberEpoch;
+
+    /**
+     * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}
+     */
     private MemberState state;
+
+    /**
+     * Assignor type selection for the member. If non-null, the member will 
send its selection to
+     * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the 
server will select a
+     * default assignor for the member, which the member does not need to 
track.
+     */
     private AssignorSelection assignorSelection;
 
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
     private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
     /**
      * Assignment that the member received from the server but hasn't 
completely processed
      * yet.
      */
     private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+
     /**
-     * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
-     * was in process.
+     * slf4j logger.
      */
-    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
nextTargetAssignment;
+    private final Logger log;
 
-    public MembershipManagerImpl(String groupId) {
-        this(groupId, null, null);
+    public MembershipManagerImpl(String groupId, LogContext logContext) {
+        this(groupId, null, null, logContext);
     }
 
-    public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+    public MembershipManagerImpl(String groupId, String groupInstanceId,
+                                 AssignorSelection assignorSelection, 
LogContext logContext) {
+        if (groupId == null) {
+            throw new IllegalArgumentException("Group ID cannot be null.");

Review Comment:
   I like this extra level of paranoia 😄  This is primarily to catch a 
programming issue for us though, right? A user shouldn't be able to hit this.
   
   ```suggestion
               throw new IllegalArgumentException("Consumer group membership 
should not be created if the group.id is null.");
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * ID of the consumer group the member will be part of., provided when 
creating the current
+     * membership manager.
+     */
     private final String groupId;
+
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
     private final Optional<String> groupInstanceId;
+
+    /**
+     * Member ID assigned by the server to the member, received in a heartbeat 
response when
+     * joining the group specified in {@link #groupId}
+     */
     private String memberId;
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives.
+     */
     private int memberEpoch;
+
+    /**
+     * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}
+     */
     private MemberState state;
+
+    /**
+     * Assignor type selection for the member. If non-null, the member will 
send its selection to
+     * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the 
server will select a
+     * default assignor for the member, which the member does not need to 
track.
+     */
     private AssignorSelection assignorSelection;
 
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
     private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
     /**
      * Assignment that the member received from the server but hasn't 
completely processed
      * yet.
      */
     private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+
     /**
-     * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
-     * was in process.
+     * slf4j logger.

Review Comment:
   I think this is extraneous?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -84,56 +119,84 @@ public void setAssignorSelection(AssignorSelection 
assignorSelection) {
         this.assignorSelection = assignorSelection;
     }
 
+    /**
+     * Update the member state, setting it to the nextState only if it is a 
valid transition.
+     *
+     * @throws IllegalStateException If transitioning from the member {@link 
#state} to the
+     *                               nextState is not allowed as defined in 
{@link MemberState}.
+     */
     private void transitionTo(MemberState nextState) {
         if (!this.state.equals(nextState) && 
!nextState.getPreviousValidStates().contains(state)) {
-            // TODO: handle invalid state transition
-            throw new RuntimeException(String.format("Invalid state transition 
from %s to %s",
+            throw new IllegalStateException(String.format("Invalid state 
transition from %s to %s",
                     state, nextState));
         }
         this.state = nextState;

Review Comment:
   Might I suggest some `log.trace()`-level logging when this state transition 
occurs?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * ID of the consumer group the member will be part of., provided when 
creating the current
+     * membership manager.
+     */
     private final String groupId;
+
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
     private final Optional<String> groupInstanceId;
+
+    /**
+     * Member ID assigned by the server to the member, received in a heartbeat 
response when
+     * joining the group specified in {@link #groupId}
+     */
     private String memberId;
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives.
+     */
     private int memberEpoch;
+
+    /**
+     * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}
+     */
     private MemberState state;
+
+    /**
+     * Assignor type selection for the member. If non-null, the member will 
send its selection to
+     * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the 
server will select a
+     * default assignor for the member, which the member does not need to 
track.
+     */
     private AssignorSelection assignorSelection;
 
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
     private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
     /**
      * Assignment that the member received from the server but hasn't 
completely processed
      * yet.
      */
     private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+
     /**
-     * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
-     * was in process.
+     * slf4j logger.

Review Comment:
   Oh, maybe not. I don't think we need to provide JavaDoc for the loggers, do 
we?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * ID of the consumer group the member will be part of., provided when 
creating the current

Review Comment:
   ```suggestion
        * ID of the consumer group the member will be part of, provided when 
creating the current
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * ID of the consumer group the member will be part of., provided when 
creating the current
+     * membership manager.
+     */
     private final String groupId;
+
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
     private final Optional<String> groupInstanceId;
+
+    /**
+     * Member ID assigned by the server to the member, received in a heartbeat 
response when
+     * joining the group specified in {@link #groupId}
+     */
     private String memberId;
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives.
+     */
     private int memberEpoch;
+
+    /**
+     * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}

Review Comment:
   Typo?
   
   ```suggestion
        * Current state of this member as part of the consumer group, as 
defined in {@link MemberState}
   ```
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,56 +19,91 @@
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 
 /**
- * Membership manager that maintains group membership for a single member 
following the new
+ * Membership manager that maintains group membership for a single member, 
following the new
  * consumer group protocol.
  * <p/>
- * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
- * the member receives. It is also responsible for computing assignment for 
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the 
member receives.
  */
 public class MembershipManagerImpl implements MembershipManager {
 
+    /**
+     * ID of the consumer group the member will be part of., provided when 
creating the current
+     * membership manager.
+     */
     private final String groupId;
+
+    /**
+     * Group instance ID to be used by the member, provided when creating the 
current membership manager.
+     */
     private final Optional<String> groupInstanceId;
+
+    /**
+     * Member ID assigned by the server to the member, received in a heartbeat 
response when
+     * joining the group specified in {@link #groupId}
+     */
     private String memberId;
+
+    /**
+     * Current epoch of the member. It will be set to 0 by the member, and 
provided to the server
+     * on the heartbeat request, to join the group. It will be then maintained 
by the server,
+     * incremented as the member reconciles and acknowledges the assignments 
it receives.
+     */
     private int memberEpoch;
+
+    /**
+     * Current state of this member a part of the consumer group, as defined 
in {@link MemberState}
+     */
     private MemberState state;
+
+    /**
+     * Assignor type selection for the member. If non-null, the member will 
send its selection to
+     * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the 
server will select a
+     * default assignor for the member, which the member does not need to 
track.
+     */
     private AssignorSelection assignorSelection;
 
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
     private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
     /**
      * Assignment that the member received from the server but hasn't 
completely processed
      * yet.
      */
     private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+
     /**
-     * Latest assignment that the member received from the server while a 
{@link #targetAssignment}
-     * was in process.
+     * slf4j logger.
      */
-    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
nextTargetAssignment;
+    private final Logger log;
 
-    public MembershipManagerImpl(String groupId) {
-        this(groupId, null, null);
+    public MembershipManagerImpl(String groupId, LogContext logContext) {
+        this(groupId, null, null, logContext);
     }
 
-    public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+    public MembershipManagerImpl(String groupId, String groupInstanceId,
+                                 AssignorSelection assignorSelection, 
LogContext logContext) {
+        if (groupId == null) {
+            throw new IllegalArgumentException("Group ID cannot be null.");

Review Comment:
   I like this extra level of paranoia 😄  This is primarily to catch a 
programming issue for us though, right? A user shouldn't be able to hit this.
   
   ```suggestion
               throw new IllegalArgumentException("Consumer group membership 
should not be created if the group.id is null.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to