lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361286203
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -19,46 +19,84 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
import java.util.Optional;
/**
- * Membership manager that maintains group membership for a single member
following the new
+ * Membership manager that maintains group membership for a single member,
following the new
* consumer group protocol.
* <p/>
- * This keeps membership state and assignment updated in-memory, based on the
heartbeat responses
- * the member receives. It is also responsible for computing assignment for
the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
+ * <li>Keeping member state as defined in {@link MemberState}.</li>
+ * <p/>
+ * Member info and state are updated based on the heartbeat responses the
member receives.
*/
public class MembershipManagerImpl implements MembershipManager {
+ /**
+ * ID of the consumer group the member will be part of, provided when
creating the current
+ * membership manager.
+ */
private final String groupId;
+
+ /**
+ * Group instance ID to be used by the member, provided when creating the
current membership manager.
+ */
private final Optional<String> groupInstanceId;
+
+ /**
+ * Member ID assigned by the server to the member, received in a heartbeat
response when
+ * joining the group specified in {@link #groupId}
+ */
private String memberId;
+
+ /**
+ * Current epoch of the member. It will be set to 0 by the member, and
provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained
by the server,
+ * incremented as the member reconciles and acknowledges the assignments
it receives.
Review Comment:
Do you mean on fencing errors? We do reset to 0 when member gets fenced (and
it is indeed not explained in the doc).
On fatal errors we are currently just leaving the epoch unchanged thinking
it won't be used anymore (the member transitions to an unrecoverable FAILED
state where it will not re-join the group). To be reconsidered now with the
OffsetFetch/Commit v9 in sight I guess...we wouldn't want a FAILED member to
include an old epoch in an OffsetFetch/Commit request right? Just perform the
OffsetFetch/Commit without epoch I guess?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]