Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-28 Thread via GitHub


lianetm commented on PR #14690:
URL: https://github.com/apache/kafka/pull/14690#issuecomment-1830640924

   Follow-up PR https://github.com/apache/kafka/pull/14857


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-28 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1408361923


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,113 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
  */
-FAILED;
+PREPARE_LEAVING,
 
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until
+ * the next heartbeat request is sent out with epoch -1 to effectively 
leave the group. This
+ 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-28 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1408357542


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -192,6 +221,26 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 });
 }
 
+private List 
buildTopicPartitionsList(Set topicIdPartitions) {
+List result = new 
ArrayList<>();
+Map> partitionsPerTopicId = new HashMap<>();
+for (TopicIdPartition topicIdPartition : topicIdPartitions) {
+Uuid topicId = topicIdPartition.topicId();
+if (!partitionsPerTopicId.containsKey(topicId)) {
+partitionsPerTopicId.put(topicId, new ArrayList<>());
+}
+
partitionsPerTopicId.get(topicId).add(topicIdPartition.partition());

Review Comment:
   Agree. It disappeared anyways after simplifying it all with the use of 
TopicPartitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-28 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1408355119


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -192,6 +221,26 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 });
 }
 
+private List 
buildTopicPartitionsList(Set topicIdPartitions) {
+List result = new 
ArrayList<>();
+Map> partitionsPerTopicId = new HashMap<>();
+for (TopicIdPartition topicIdPartition : topicIdPartitions) {
+Uuid topicId = topicIdPartition.topicId();
+if (!partitionsPerTopicId.containsKey(topicId)) {
+partitionsPerTopicId.put(topicId, new ArrayList<>());
+}
+
partitionsPerTopicId.get(topicId).add(topicIdPartition.partition());
+}
+for (Map.Entry> entry : 
partitionsPerTopicId.entrySet()) {
+Uuid topicId = entry.getKey();
+List partitions = entry.getValue();
+result.add(new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicId)
+.setPartitions(partitions));
+}

Review Comment:
   Definitely, done in the [follow-up 
PR](https://github.com/apache/kafka/pull/14857)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-28 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1408354526


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -168,76 +329,686 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 this.memberId = response.memberId();
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+
 if (assignment != null) {
-setTargetAssignment(assignment);
+transitionTo(MemberState.RECONCILING);
+replaceUnresolvedAssignmentWithNewAssignment(assignment);
+resolveMetadataForUnresolvedAssignment();
+reconcile();
+} else if (allPendingAssignmentsReconciled()) {
+transitionTo(MemberState.STABLE);
 }
-maybeTransitionToStable();
+}
+
+/**
+ * Overwrite collection of unresolved topic Ids with the new target 
assignment. This will
+ * effectively achieve the following:
+ *
+ *- all topics received in assignment will try to be resolved to find 
their topic names
+ *
+ *- any topic received in a previous assignment that was still 
unresolved, and that is
+ *not included in the assignment anymore, will be removed from the 
unresolved collection.
+ *This should be the case when a topic is sent in an assignment, 
deleted right after, and
+ *removed from the assignment the next time a broker sends one to the 
member.
+ *
+ * @param assignment Target assignment received from the broker.
+ */
+private void replaceUnresolvedAssignmentWithNewAssignment(
+ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+assignmentUnresolved.clear();
+assignment.topicPartitions().forEach(topicPartitions ->
+assignmentUnresolved.put(topicPartitions.topicId(), 
topicPartitions.partitions()));
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
 public void transitionToFenced() {
-resetEpoch();
 transitionTo(MemberState.FENCED);
+resetEpoch();
+log.debug("Member {} with epoch {} transitioned to {} state. It will 
release its " +
+"assignment and rejoin the group.", memberId, memberEpoch, 
MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+" after member got fenced. Member will rejoin the 
group anyways.", error);
+}
+updateSubscription(Collections.emptySet(), true);
+transitionToJoining();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+transitionTo(MemberState.FATAL);
+log.error("Member {} with epoch {} transitioned to {} state", 
memberId, memberEpoch, MemberState.FATAL);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+updateSubscription(Collections.emptySet(), true);
+});
 }
 
+/**
+ * {@inheritDoc}
+ */
+public void onSubscriptionUpdated() {
+if (state == MemberState.UNSUBSCRIBED) {
+transitionToJoining();
+}
+}
+
+/**
+ * Update a new assignment by setting the assigned partitions in the 
member subscription.
+ *
+ * @param assignedPartitions Topic partitions to take as the new 
subscription assignment
+ * @param clearAssignments True if the
+ */
+private void updateSubscription(Collection 
assignedPartitions,
+boolean clearAssignments) {
+subscriptions.assignFromSubscribed(assignedPartitions);
+if (clearAssignments) {
+clearPendingAssignmentsAndLocalNamesCache();
+}
+}
+
+/**
+ * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+ * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+ * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenc

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-28 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1408353902


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -168,76 +329,686 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 this.memberId = response.memberId();
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+
 if (assignment != null) {
-setTargetAssignment(assignment);
+transitionTo(MemberState.RECONCILING);
+replaceUnresolvedAssignmentWithNewAssignment(assignment);
+resolveMetadataForUnresolvedAssignment();
+reconcile();
+} else if (allPendingAssignmentsReconciled()) {
+transitionTo(MemberState.STABLE);
 }
-maybeTransitionToStable();
+}
+
+/**
+ * Overwrite collection of unresolved topic Ids with the new target 
assignment. This will
+ * effectively achieve the following:
+ *
+ *- all topics received in assignment will try to be resolved to find 
their topic names
+ *
+ *- any topic received in a previous assignment that was still 
unresolved, and that is
+ *not included in the assignment anymore, will be removed from the 
unresolved collection.
+ *This should be the case when a topic is sent in an assignment, 
deleted right after, and
+ *removed from the assignment the next time a broker sends one to the 
member.
+ *
+ * @param assignment Target assignment received from the broker.
+ */
+private void replaceUnresolvedAssignmentWithNewAssignment(
+ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+assignmentUnresolved.clear();
+assignment.topicPartitions().forEach(topicPartitions ->
+assignmentUnresolved.put(topicPartitions.topicId(), 
topicPartitions.partitions()));
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
 public void transitionToFenced() {
-resetEpoch();
 transitionTo(MemberState.FENCED);
+resetEpoch();
+log.debug("Member {} with epoch {} transitioned to {} state. It will 
release its " +
+"assignment and rejoin the group.", memberId, memberEpoch, 
MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+" after member got fenced. Member will rejoin the 
group anyways.", error);
+}
+updateSubscription(Collections.emptySet(), true);
+transitionToJoining();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+transitionTo(MemberState.FATAL);
+log.error("Member {} with epoch {} transitioned to {} state", 
memberId, memberEpoch, MemberState.FATAL);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+updateSubscription(Collections.emptySet(), true);
+});
 }
 
+/**
+ * {@inheritDoc}
+ */
+public void onSubscriptionUpdated() {
+if (state == MemberState.UNSUBSCRIBED) {
+transitionToJoining();
+}
+}
+
+/**
+ * Update a new assignment by setting the assigned partitions in the 
member subscription.
+ *
+ * @param assignedPartitions Topic partitions to take as the new 
subscription assignment
+ * @param clearAssignments True if the
+ */
+private void updateSubscription(Collection 
assignedPartitions,
+boolean clearAssignments) {
+subscriptions.assignFromSubscribed(assignedPartitions);
+if (clearAssignments) {
+clearPendingAssignmentsAndLocalNamesCache();
+}
+}
+
+/**
+ * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+ * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+ * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenc

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-20 Thread via GitHub


dajac merged PR #14690:
URL: https://github.com/apache/kafka/pull/14690


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-20 Thread via GitHub


dajac commented on PR #14690:
URL: https://github.com/apache/kafka/pull/14690#issuecomment-1818883304

   I just merged trunk to fix conflicts. We can merge it when the build 
completes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-20 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1398998763


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -137,45 +140,71 @@ public HeartbeatRequestManager(
 }
 
 /**
- * Determines the maximum wait time until the next poll based on the 
member's state, and creates a heartbeat
- * request.
+ * This will build a heartbeat request if one must be sent, determined 
based on the member
+ * state. A heartbeat is sent in the following situations:
+ * 
+ * Member is part of the consumer group or wants to join it.
+ * The heartbeat interval has expired, or the member is in a state 
that indicates
+ * that it should heartbeat without waiting for the interval.
+ * 
+ * This will also determine the maximum wait time until the next poll 
based on the member's
+ * state.
  * 
- * If the member is without a coordinator or is in a failed state, 
the timer is set to Long.MAX_VALUE, as there's no need to send a heartbeat.
- * If the member cannot send a heartbeat due to either exponential 
backoff, it will return the remaining time left on the backoff timer.
- * If the member's heartbeat timer has not expired, It will return 
the remaining time left on the
- * heartbeat timer.
+ * If the member is without a coordinator or is in a failed state, 
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.
+ * If the member cannot send a heartbeat due to either exponential 
backoff, it will
+ * return the remaining time left on the backoff timer.
+ * If the member's heartbeat timer has not expired, It will return 
the remaining time
+ * left on the heartbeat timer.
  * If the member can send a heartbeat, the timer is set to the 
current heartbeat interval.
  * 
+ *
+ * @return {@link PollResult} that includes a heartbeat request if one 
must be sent, and the
+ * time to wait until the next poll.
  */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-if (!coordinatorRequestManager.coordinator().isPresent() || 
!membershipManager.shouldSendHeartbeat())
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat()) {
+membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
+}
+
+boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
 
-// TODO: We will need to send a heartbeat response after partitions 
being revoke. This needs to be
-//  implemented either with or after the partition reconciliation 
logic.
-if (!heartbeatRequestState.canSendRequest(currentTimeMs))
+if (!heartbeatRequestState.canSendRequest(currentTimeMs) && 
!heartbeatNow) {
 return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
+}
 
-this.heartbeatRequestState.onSendAttempt(currentTimeMs);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
 NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest();
 return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
 }
 
 private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
-// TODO: We only need to send the rebalanceTimeoutMs field once unless 
the first request failed.
+// TODO: extract this logic for building the 
ConsumerGroupHeartbeatRequestData to a
+//  stateful builder (HeartbeatState), that will keep the last data 
sent, and determine
+//  the fields that changed and need to be included in the next HB 
(ex. check
+//  subscriptionState changed from last sent to include assignment). 
It should also
+//  ensure that all fields are sent on failure.
 ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData()
 .setGroupId(membershipManager.groupId())
 .setMemberEpoch(membershipManager.memberEpoch())
-.setMemberId(membershipManager.memberId())
 .setRebalanceTimeoutMs(rebalanceTimeoutMs);
 
+if (membershipManager.memberId() != null) {
+data.setMemberId(membershipManager.memberId());
+}
+
 membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
 if (this.subscriptions.hasPatternSubscription()) {
 // TODO: Pass the string to the GC if server side regex is used.
 } else {
 data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+List 
topicPartition

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396826235


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+});
+subscriptions.assignFromSubscribed(Collections.emptySet());
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+public void onSubscriptionUpdated() {
+if (state == MemberState.UNSUBSCRIBED) {
+transitionToJoining();
+}
+// TODO: If the member is already part of the group, this should only 
ensure that the
+//  updated subscription is included in the next heartbeat request.
 }
 
+/**
+ * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+ * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+ * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+ * Visible for testing.
+ */
+void transitionToJoining() {
+if (state == MemberState.FATAL) {
+log.warn("No action taken to join the group with the updated 
subscription because " +
+"the member is in FATAL state");
+return;
+}
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+registerForMetadataUpdates();
+}
+
+/**
+ * Register to get notified when the cluster metadata is updated, via the
+ * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+ */
+private void registerForMetadataUpdates() {
+if (!isRegisteredForMetadataUpdates) {
+this.metadata.addClusterUpdateListener(this);
+isRegisteredForMetadataUpdates = true;
+}
+}
+
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public CompletableFuture leaveGroup() {
+if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+// Member is not part of the group. No-op and return completed 
future to avoid
+// unnecessary transitions.
+return CompletableFuture.completedFuture(null);
+}
+
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+// Member already leaving. No-op and return existing leave group 
future that will
+// complete when the ongoing leave operation completes.
+return leaveGroupInProgress.get();
+}
+
+transitionTo(MemberState.PREPARE_LEAVING);
+leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscr

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396494887


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396494887


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396474166


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396462214


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,11 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {

Review Comment:
   I updated it to align with the current behaviour (callbacks, best effort to 
send leave group request without any response handling or retry, and call to 
`subscriptions.unsubscribe` when everything completes). This has the gap of the 
callback execution that would require a poll. Given that we don't support 
callbacks in this PR, it won't block the flow, but definitely to be solved (I 
added the details of the challenge to solve in the [callbacks 
Jira](https://issues.apache.org/jira/browse/KAFKA-15276))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r139605


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396013115


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+});
+subscriptions.assignFromSubscribed(Collections.emptySet());
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+public void onSubscriptionUpdated() {
+if (state == MemberState.UNSUBSCRIBED) {
+transitionToJoining();
+}
+// TODO: If the member is already part of the group, this should only 
ensure that the
+//  updated subscription is included in the next heartbeat request.
 }
 
+/**
+ * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+ * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+ * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+ * Visible for testing.
+ */
+void transitionToJoining() {
+if (state == MemberState.FATAL) {
+log.warn("No action taken to join the group with the updated 
subscription because " +
+"the member is in FATAL state");
+return;
+}
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+registerForMetadataUpdates();
+}
+
+/**
+ * Register to get notified when the cluster metadata is updated, via the
+ * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+ */
+private void registerForMetadataUpdates() {
+if (!isRegisteredForMetadataUpdates) {
+this.metadata.addClusterUpdateListener(this);
+isRegisteredForMetadataUpdates = true;
+}
+}
+
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public CompletableFuture leaveGroup() {
+if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+// Member is not part of the group. No-op and return completed 
future to avoid
+// unnecessary transitions.
+return CompletableFuture.completedFuture(null);
+}
+
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+// Member already leaving. No-op and return existing leave group 
future that will
+// complete when the ongoing leave operation completes.
+return leaveGroupInProgress.get();
+}
+
+transitionTo(MemberState.PREPARE_LEAVING);
+leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscr

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396006392


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+});
+subscriptions.assignFromSubscribed(Collections.emptySet());
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+public void onSubscriptionUpdated() {
+if (state == MemberState.UNSUBSCRIBED) {
+transitionToJoining();
+}
+// TODO: If the member is already part of the group, this should only 
ensure that the
+//  updated subscription is included in the next heartbeat request.
 }
 
+/**
+ * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+ * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+ * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+ * Visible for testing.
+ */
+void transitionToJoining() {
+if (state == MemberState.FATAL) {
+log.warn("No action taken to join the group with the updated 
subscription because " +
+"the member is in FATAL state");
+return;
+}
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+registerForMetadataUpdates();
+}
+
+/**
+ * Register to get notified when the cluster metadata is updated, via the
+ * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+ */
+private void registerForMetadataUpdates() {
+if (!isRegisteredForMetadataUpdates) {
+this.metadata.addClusterUpdateListener(this);
+isRegisteredForMetadataUpdates = true;
+}
+}
+
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public CompletableFuture leaveGroup() {
+if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+// Member is not part of the group. No-op and return completed 
future to avoid
+// unnecessary transitions.
+return CompletableFuture.completedFuture(null);
+}
+
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+// Member already leaving. No-op and return existing leave group 
future that will
+// complete when the ongoing leave operation completes.
+return leaveGroupInProgress.get();
+}
+
+transitionTo(MemberState.PREPARE_LEAVING);
+leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscr

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395947490


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+});
+subscriptions.assignFromSubscribed(Collections.emptySet());
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+public void onSubscriptionUpdated() {
+if (state == MemberState.UNSUBSCRIBED) {
+transitionToJoining();
+}
+// TODO: If the member is already part of the group, this should only 
ensure that the
+//  updated subscription is included in the next heartbeat request.
 }
 
+/**
+ * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+ * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+ * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+ * Visible for testing.
+ */
+void transitionToJoining() {
+if (state == MemberState.FATAL) {
+log.warn("No action taken to join the group with the updated 
subscription because " +
+"the member is in FATAL state");
+return;
+}
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+registerForMetadataUpdates();
+}
+
+/**
+ * Register to get notified when the cluster metadata is updated, via the
+ * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+ */
+private void registerForMetadataUpdates() {
+if (!isRegisteredForMetadataUpdates) {
+this.metadata.addClusterUpdateListener(this);
+isRegisteredForMetadataUpdates = true;
+}
+}
+
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public CompletableFuture leaveGroup() {
+if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+// Member is not part of the group. No-op and return completed 
future to avoid
+// unnecessary transitions.
+return CompletableFuture.completedFuture(null);
+}
+
+if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+// Member already leaving. No-op and return existing leave group 
future that will
+// complete when the ongoing leave operation completes.
+return leaveGroupInProgress.get();
+}
+
+transitionTo(MemberState.PREPARE_LEAVING);
+leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscr

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395929369


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+});
+subscriptions.assignFromSubscribed(Collections.emptySet());
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+public void onSubscriptionUpdated() {
+if (state == MemberState.UNSUBSCRIBED) {
+transitionToJoining();
+}
+// TODO: If the member is already part of the group, this should only 
ensure that the
+//  updated subscription is included in the next heartbeat request.

Review Comment:
   Agree that the next HB will pick it up based on the interval (also not 
seeing much need/value in the forced HB). Removed the TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395912856


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+});
+subscriptions.assignFromSubscribed(Collections.emptySet());
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);

Review Comment:
   No reason, updated it to make it consistent with the fencing transition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395907406


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member failed with fatal error.", error);
+}
+});
+subscriptions.assignFromSubscribed(Collections.emptySet());

Review Comment:
   No reason, moved it to after the callback completes, consistent with how it 
is done on fencing, leave and reconcile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395894946


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;

Review Comment:
   You're right, not needed anymore. The member will transition to fatal state 
but can keep its last member ID and epoch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395877609


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -168,10 +319,37 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 this.memberId = response.memberId();
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+
 if (assignment != null) {
-setTargetAssignment(assignment);
+transitionTo(MemberState.RECONCILING);
+replaceUnresolvedAssignmentWithNewAssignment(assignment);
+resolveMetadataForUnresolvedAssignment();
+// TODO: improve reconciliation triggering. Initial approach of 
triggering on every
+//  HB response and metadata update.

Review Comment:
   Agree, done. All TODOs in this class have jiras already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395862422


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -17,253 +17,1059 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
-import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class MembershipManagerImplTest {
 
 private static final String GROUP_ID = "test-group";
 private static final String MEMBER_ID = "test-member-1";
 private static final int MEMBER_EPOCH = 1;
-private final LogContext logContext = new LogContext();
+
+private SubscriptionState subscriptionState;
+private ConsumerMetadata metadata;
+
+private CommitRequestManager commitRequestManager;
+
+private ConsumerTestBuilder testBuilder;
+
+@BeforeEach
+public void setup() {
+testBuilder = new 
ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation());
+metadata = testBuilder.metadata;
+subscriptionState = testBuilder.subscriptions;
+commitRequestManager = testBuilder.commitRequestManager.get();
+}
+
+@AfterEach
+public void tearDown() {
+if (testBuilder != null) {
+testBuilder.close();
+}
+}
+
+private MembershipManagerImpl createMembershipManagerJoiningGroup() {
+MembershipManagerImpl manager = spy(new MembershipManagerImpl(
+GROUP_ID, subscriptionState, commitRequestManager,
+metadata, testBuilder.logContext));
+manager.transitionToJoining();
+return manager;
+}
+
+private MembershipManagerImpl createMembershipManagerJoiningGroup(String 
groupInstanceId,
+  String 
serverAssignor) {
+MembershipManagerImpl manager = new MembershipManagerImpl(
+GROUP_ID, Optional.ofNullable(groupInstanceId), 
Optional.ofNullable(serverAssignor),
+subscriptionState, commitRequestManager, metadata, 
testBuilder.logContext);
+manager.transitionToJoining();
+return manager;
+}
 
 @Test
 public void testMembershipManagerServerAssignor() {
-MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
 assertEquals(Optional.empty(), membershipManager.serverAssignor());
 
-membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", 
"Uniform", logContext);
+membershipManager = createMembershipManagerJoiningGroup("instance1", 
"Uniform");
 assertEquals(Optional.of("Uniform"), 
membershipManager.serverAssignor());
 }
 
 @Test
 public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
-new MembershipManagerImpl(GROUP_ID, logContext);
-new MembershipManagerImpl(GROUP_ID, null, null, logContext);
+createMembershipManagerJoiningGroup();
+createMembershipManagerJoiningGroup(null, null);
+}
+
+@Test
+public void 
testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() {
+// First join should register to get metadata updates
+MembershipManagerImpl manager = new MembershipManagerImpl(
+GROUP_ID, subscriptionState, commitRequestManager,
+metadata, testBuilder.logContext);
+manager.tran

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395677889


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -17,25 +17,93 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * Membership manager that maintains group membership for a single member, 
following the new
- * consumer group protocol.
+ * Group manager for a single consumer that has a group id defined in the 
config
+ * {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset 
management capability,
+ * and the consumer group protocol to get automatically assigned partitions 
when calling the
+ * subscribe API.
+ *
+ * 
+ *
+ * While the subscribe API hasn't been called (or if the consumer called 
unsubscribe), this manager
+ * will only be responsible for keeping the member in the {@link 
MemberState#UNSUBSCRIBED} state,
+ * where it can commit offsets to the group identified by the {@link 
#groupId()}, without joining
+ * the group.
+ *
+ * 
+ *
+ * If the consumer subscribe API is called, this manager will use the {@link 
#groupId()} to join the
+ * consumer group, and based on the consumer group protocol heartbeats, will 
handle the full
+ * lifecycle of the member as it joins the group, reconciles assignments, 
handles fencing and
+ * fatal errors, and leaves the group.
+ *
  * 
- * This is responsible for:
- * Keeping member info (ex. member id, member epoch, assignment, etc.)
- * Keeping member state as defined in {@link MemberState}.
+ *
+ * Reconciliation process:
+ * The member accepts all assignments received from the broker, resolves topic 
names from
+ * metadata, reconciles the resolved assignments, and keeps the unresolved to 
be reconciled when
+ * discovered with a metadata update. Reconciliations of resolved assignments 
are executed
+ * sequentially and acknowledged to the server as they complete. The 
reconciliation process
+ * involves multiple async operations, so the member will continue to 
heartbeat while these
+ * operations complete, to make sure that the member stays in the group while 
reconciling.
+ *
  * 
- * Member info and state are updated based on the heartbeat responses the 
member receives.
+ *
+ * Reconciliation steps:
+ * 
+ * Resolve topic names for all topic IDs received in the target 
assignment. Topic names
+ * found in metadata are then ready to be reconciled. Topic IDs not found 
are kept as
+ * unresolved, and the member request metadata updates until it resolves 
them (or the broker
+ * removes it from the target assignment.
+ * Commit offsets if auto-commit is enabled.
+ * Invoke the user-defined onPartitionsRevoked listener.
+ * Invoke the user-defined onPartitionsAssigned listener.
+ * When the above steps complete, the member acknowledges the 
reconciled assignment,
+ * which is the subset of the target that was resolved from metadata and 
actually reconciled.
+ * The ack is performed by sending a heartbeat request back to the broker, 
including the
+ * reconciled assignment.
+ * .

Review Comment:
   nit: I suppose that this one should go on the previous line.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();

Review Comment:
   nit: I may be worth logging something here as well to be consistent with 
`transitionToFatal`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -168,10 +319,37 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 this.memberId = response.memberId();
 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395669911


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395667965


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-16 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395665723


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,113 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
  */
-FAILED;
+PREPARE_LEAVING,
 
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until
+ * the next heartbeat request is sent out with epoch -1 to effectively 
leave the group. This
+ * 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395143611


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395138981


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,113 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
  */
-FAILED;
+PREPARE_LEAVING,
 
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until
+ * the next heartbeat request is sent out with epoch -1 to effectively 
leave the group. This
+ 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395138233


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,113 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
  */
-FAILED;
+PREPARE_LEAVING,
 
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until
+ * the next heartbeat request is sent out with epoch -1 to effectively 
leave the group. This
+ 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395132794


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -137,39 +135,104 @@ public HeartbeatRequestManager(
 }
 
 /**
- * Determines the maximum wait time until the next poll based on the 
member's state, and creates a heartbeat
- * request.
+ * This will ensure that the member starts sending heartbeats to join the 
group with the
+ * updated subscription, if it is not already part of it. If the member is 
already part of
+ * the group, this will only ensure that the updated subscription is sent 
on the next
+ * heartbeat request. No action will be taken if the member is in a {@link 
MemberState#FATAL}
+ * state.
+ * 
+ * Note that list of topics of the subscription is taken from the shared 
subscription state.
+ */
+public void onSubscriptionUpdated() {
+if (membershipManager.state() == MemberState.FATAL) {
+logger.debug("No action taken join the group or update the 
subscription because " +
+"the member is in FATAL state");
+return;
+}
+
+if (membershipManager.state() == MemberState.UNSUBSCRIBED) {
+membershipManager.transitionToJoining();
+}
+}
+
+/**
+ * Release assignment and send heartbeat request to leave the group. If 
the member is not
+ * part of the group or is in a FATAL state this won't take any action and 
will return a
+ * completed future.
+ *
+ * @return Future that will complete when the callback execution completes 
and the heartbeat
+ * request to leave is sent out. The future will fail it the callback 
execution fails.
+ */
+public CompletableFuture onUnsubscribe() {
+boolean notInGroup =
+membershipManager.state() == MemberState.UNSUBSCRIBED ||
+membershipManager.state() == MemberState.FATAL;
+if (notInGroup) {
+return CompletableFuture.completedFuture(null);
+}
+// TODO: Consider no-op if member is already LEAVING too (repeated 
calls to unsubscribe
+//  potentially storming the broker?). To double check the current 
behaviour, as it does
+//  not seem to handle it that way.

Review Comment:
   Done (now in the membership manager `leaveGroup`). No-op if already leaving, 
and returning the future that will complete when the ongoing leave completes. 
Also handling the case where the member already left (no-op and return right 
away)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395130865


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -137,39 +135,104 @@ public HeartbeatRequestManager(
 }
 
 /**
- * Determines the maximum wait time until the next poll based on the 
member's state, and creates a heartbeat
- * request.
+ * This will ensure that the member starts sending heartbeats to join the 
group with the
+ * updated subscription, if it is not already part of it. If the member is 
already part of
+ * the group, this will only ensure that the updated subscription is sent 
on the next
+ * heartbeat request. No action will be taken if the member is in a {@link 
MemberState#FATAL}
+ * state.
+ * 
+ * Note that list of topics of the subscription is taken from the shared 
subscription state.
+ */
+public void onSubscriptionUpdated() {

Review Comment:
   Agree, I merged them into the membership manager, and this actually goes in 
the same direction we've discussed about the membership manager becoming a 
first-class manager (supporting poll, for instance).So for now I integrated it 
with the `ApplicationEventProcessor` already, to be able to move these 2 funcs 
that I totally agree make sense in the membership manager (when tackling the 
poll for triggering reconciliations, I will extend on this same direction)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395108554


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395108554


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394789418


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394786770


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394774045


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394759427


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -168,10 +308,37 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 this.memberId = response.memberId();
 this.memberEpoch = response.memberEpoch();
 ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+
 if (assignment != null) {
-setTargetAssignment(assignment);
+transitionTo(MemberState.RECONCILING);

Review Comment:
   You got it right, I expect the same thing (I had this 
[testReconciliationSkippedWhenSameAssignmentReceived](https://github.com/apache/kafka/blob/280652bc2a794bdd942506931bca756673bbf361/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java#L559)
 for that)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394753399


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,111 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-private Optional 
targetAssignment;
+private final SubscriptionState subscriptions;
+
+/**
+ * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+ */
+private final ConsumerMetadata metadata;
+
+/**
+ * TopicPartition comparator based on topic name and partition id.
+ */
+private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
 /**
  * Logger.
  */
 private final Logger log;
 
-public MembershipManagerImpl(String groupId, LogContext logContext) {
-this(groupId, null, null, logContext);
+/**
+ * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+ * enabled)
+ */
+private final CommitRequestManager commitRequestManager;
+
+/**
+ * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+ * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+ * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+ * requests in cases where a currently assigned topic is in the target 
assignment (new
+ * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+ * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+ * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+ */
+private final Map assignedTopicNamesCache;
+
+/**
+ * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+ * Items are added to this set every time a target assignment is received. 
Items are removed
+ * when metadata is found for the topic. This is where the member collects 
all assignments
+ * received from the broker, even though they may not be ready to 
reconcile due to missing
+ * metadata.
+ */
+private final Map> assignmentUnresolved;
+
+/**
+ * Assignment received for which topic names have been resolved, so it's 
ready to be
+ * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+ * available), or when a metadata update is received. This is where the 
member keeps all the
+ * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+ * is already another on in process.
+ */
+private final SortedSet assignmentReadyToReconcile;
+
+/**
+ * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+ * re-join a group.
+ */
+public static final int JOIN_GROUP_EPOCH = 0;
+
+/**
+ * If there is a reconciliation running (triggering commit, callbacks) for 
the
+ * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+ * after receiving a heartbeat response, or a metadata update.
+ */
+private boolean reconciliationInProgress;
+
+/**
+ * ID the member had when the reconciliation in progress started. This is 
used to identify if
+ * the member has rejoined while it was reconciling an assignment (in 
which case the result
+ * of the reconciliation is not applied.)
+ */
+private String memberIdOnReconciliationStart;

Review Comment:
   Yes, I just updated this. We're on the same page regarding that the client 
will keep the member ID forever and provide it backbut I was wrongly 
expecting it would change after rejoining. Updated now. The goal is to be able 
to identify a rejoin, so using the member epoch (expecting that every time a 
member rejoins will get a bumped epoch).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394746271


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());

Review Comment:
   sure, done. Logging error in the same way that it's done for other callback 
failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394742401


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);

Review Comment:
   You're right, we do. Added it, along with a log in case of error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394733589


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +

Review Comment:
   Yes, done. That's how it's done for other callbacks (aligned the messages 
too to make them consistent for all callbacks)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394646561


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394629300


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394619681


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394585362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,111 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-private Optional 
targetAssignment;
+private final SubscriptionState subscriptions;
+
+/**
+ * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+ */
+private final ConsumerMetadata metadata;
+
+/**
+ * TopicPartition comparator based on topic name and partition id.
+ */
+private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
 /**
  * Logger.
  */
 private final Logger log;
 
-public MembershipManagerImpl(String groupId, LogContext logContext) {
-this(groupId, null, null, logContext);
+/**
+ * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+ * enabled)
+ */
+private final CommitRequestManager commitRequestManager;
+
+/**
+ * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+ * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+ * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+ * requests in cases where a currently assigned topic is in the target 
assignment (new
+ * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+ * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+ * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+ */
+private final Map assignedTopicNamesCache;
+
+/**
+ * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+ * Items are added to this set every time a target assignment is received. 
Items are removed
+ * when metadata is found for the topic. This is where the member collects 
all assignments
+ * received from the broker, even though they may not be ready to 
reconcile due to missing
+ * metadata.
+ */
+private final Map> assignmentUnresolved;
+
+/**
+ * Assignment received for which topic names have been resolved, so it's 
ready to be
+ * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+ * available), or when a metadata update is received. This is where the 
member keeps all the
+ * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+ * is already another on in process.
+ */
+private final SortedSet assignmentReadyToReconcile;
+
+/**
+ * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+ * re-join a group.
+ */
+public static final int JOIN_GROUP_EPOCH = 0;

Review Comment:
   Totally, done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394581206


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,111 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-private Optional 
targetAssignment;
+private final SubscriptionState subscriptions;
+
+/**
+ * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+ */
+private final ConsumerMetadata metadata;
+
+/**
+ * TopicPartition comparator based on topic name and partition id.
+ */
+private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();

Review Comment:
   Done, re-arranged a couple of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394570775


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -17,25 +17,86 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * Membership manager that maintains group membership for a single member, 
following the new
- * consumer group protocol.
+ * Group manager for a single consumer that has a group id defined in the 
config
+ * {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset 
management capability,
+ * and the consumer group protocol to get automatically assigned partitions 
when calling the
+ * subscribe API.
+ *
+ * 
+ *
+ * While the subscribe API hasn't been called (or if the consumer called 
unsubscribe), this manager
+ * will only be responsible for keeping the member in the {@link 
MemberState#UNSUBSCRIBED} state,
+ * where it can commit offsets to the group identified by the {@link 
#groupId()}, without joining
+ * the group.
+ *
+ * 
+ *
+ * If the consumer subscribe API is called, this manager will use the {@link 
#groupId()} to join the
+ * consumer group, and based on the consumer group protocol heartbeats, will 
handle the full
+ * lifecycle of the member as it joins the group, reconciles assignments, 
handles fencing and
+ * fatal errors, and leaves the group.
+ *
  * 
- * This is responsible for:
- * Keeping member info (ex. member id, member epoch, assignment, etc.)
- * Keeping member state as defined in {@link MemberState}.
+ *
+ * Reconciliation process:
+ * The member accepts all assignments received from the broker, resolves topic 
names from
+ * metadata, reconciles the resolved assignments, and keeps the unresolved to 
be reconciled when
+ * discovered with a metadata update. Reconciliations of resolved assignments 
are executed
+ * sequentially and acknowledged to the server as they complete. The 
reconciliation process
+ * involves multiple async operations, so the member will continue to 
heartbeat while these
+ * operations complete, to make sure that the member stays in the group while 
reconciling.
+ *
  * 
- * Member info and state are updated based on the heartbeat responses the 
member receives.
+ *
+ * Reconciliation steps:
+ * 
+ * Resolve topic names for all topic IDs received in the target 
assignment. Topic names
+ * found in metadata are then ready to be reconciled. Topic IDs not found 
are kept as
+ * unresolved, and the member request metadata updates until it resolves 
them (or the broker
+ * removes it from the target assignment.
+ * Commit offsets if auto-commit is enabled.
+ * Invoke the user-defined onPartitionsRevoked listener.
+ * Invoke the user-defined onPartitionsAssigned listener.
+ * When the above steps complete, the member acknowledges the target 
assignment by
+ * sending a heartbeat request back to the broker, including the full 
target assignment
+ * that was just reconciled.

Review Comment:
   Yes, you're right, I will rephrase this. It acknowledges the reconciled 
assignment, which is the subset of the target that was resolved from metadata 
and actually reconciled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394493344


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394344222


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -102,6 +107,10 @@ Map topicIds() {
 return topicIds;
 }
 
+Map topicNames() {

Review Comment:
   Yes, done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394322340


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -260,42 +924,52 @@ public Optional serverAssignor() {
  * {@inheritDoc}
  */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
+public Set currentAssignment() {
 return this.currentAssignment;
 }
 
 
 /**
- * @return Assignment that the member received from the server but hasn't 
completely processed
- * yet. Visible for testing.
+ * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
+ * because topic names are not in metadata. Visible for testing.
  */
-Optional targetAssignment() 
{
-return targetAssignment;
+Set topicsWaitingForMetadata() {
+return Collections.unmodifiableSet(assignmentUnresolved.keySet());
 }
 
 /**
- * This indicates that the reconciliation of the target assignment has 
been successfully
- * completed, so it will make it effective by assigning it to the current 
assignment.
- *
- * @params Assignment that has been successfully reconciled. This is 
expected to
- * match the target assignment defined in {@link #targetAssignment()}
+ * @return Topic partitions received in a target assignment that have been 
resolved in
+ * metadata and are ready to be reconciled. Visible for testing.
+ */
+Set assignmentReadyToReconcile() {
+return Collections.unmodifiableSet(assignmentReadyToReconcile);
+}
+
+/**
+ * @return If there is a reconciliation in process now. Note that 
reconciliation is triggered
+ * by a call to {@link #reconcile()}. Visible for testing.
+ */
+boolean reconciliationInProgress() {
+return reconciliationInProgress;
+}
+
+/**
+ * When cluster metadata is updated, try to resolve topic names for topic 
IDs received in
+ * assignment that hasn't been resolved yet.
+ * 
+ * Try to find topic names for all unresolved assignments
+ * Add discovered topic names to the local topic names cache
+ * If any topics are resolved, trigger a reconciliation 
process
+ * If some topics still remain unresolved, request another 
metadata update
+ * 
  */
 @Override
-public void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-if (assignment == null) {
-throw new IllegalArgumentException("Assignment cannot be null");
-}
-if (!assignment.equals(targetAssignment.orElse(null))) {
-// This could be simplified to remove the assignment param and 
just assume that what
-// was reconciled was the targetAssignment, but keeping it 
explicit and failing fast
-// here to uncover any issues in the interaction of the assignment 
processing logic
-// and this.
-throw new IllegalStateException(String.format("Reconciled 
assignment %s does not " +
-"match the expected target assignment %s", 
assignment,
-targetAssignment.orElse(null)));
+public void onUpdate(ClusterResource clusterResource) {
+resolveMetadataForUnresolvedAssignment();
+if (!assignmentReadyToReconcile.isEmpty()) {
+// TODO: improve reconciliation triggering. Initial approach of 
triggering on every
+//  HB response and metadata update.

Review Comment:
   Filed [KAFKA-15832](https://issues.apache.org/jira/browse/KAFKA-15832) for 
this and I will take care of it right after this PR as a follow-up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394053638


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394042794


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-15 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394032677


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392954694


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392830639


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-private Optional 
targetAssignment;
+private final SubscriptionState subscriptions;
+
+/**
+ * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+ */
+private final ConsumerMetadata metadata;
+
+/**
+ * TopicPartition comparator based on topic name and partition id.
+ */
+private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
 /**
  * Logger.
  */
 private final Logger log;
 
-public MembershipManagerImpl(String groupId, LogContext logContext) {
-this(groupId, null, null, logContext);
+/**
+ * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+ * enabled)
+ */
+private final CommitRequestManager commitRequestManager;
+
+/**
+ * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+ * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+ * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+ * requests in cases where a currently assigned topic is in the target 
assignment (new
+ * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+ * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+ * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+ */
+private final Map assignedTopicNamesCache;
+
+/**
+ * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+ * Items are added to this set every time a target assignment is received. 
Items are removed
+ * when metadata is found for the topic. This is where the member collects 
all assignments
+ * received from the broker, even though they may not be ready to 
reconcile due to missing
+ * metadata.
+ */
+private final Map> assignmentUnresolved;
+
+/**
+ * Assignment received for which topic names have been resolved, so it's 
ready to be
+ * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+ * available), or when a metadata update is received. This is where the 
member keeps all the
+ * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+ * is already another on in process.
+ */
+private final SortedSet assignmentReadyToReconcile;
+
+/**
+ * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+ * re-join a group.
+ */
+public static final int JOIN_GROUP_EPOCH = 0;
+
+/**
+ * If there is a reconciliation running (triggering commit, callbacks) for 
the
+ * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+ * after receiving a heartbeat response, or a metadata update.
+ */
+private boolean reconciliationInProgress;
+
+/**
+ * ID the member had when the reconciliation in progress started. This is 
used to identify if
+ * the member has rejoined while it was reconciling an assignment (in 
which case the result
+ * of the reconciliation is not applied.)
+ */
+private String memberIdOnReconciliationStart;
+
+

Review Comment:
   Those kinds of things tend to jump out in _other people's_ code, but I 
frequently miss them in my own 😄 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-privat

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392829031


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
 public enum Type {
 COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so 👍 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
 public enum Type {
 COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392827512


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * Application event triggered when a user calls the unsubscribe API. This 
will make the consumer
+ * release all its assignments and send a heartbeat request to leave the 
consumer group.
+ * This event holds a future that will complete when the invocation of 
callbacks to release
+ * complete and the heartbeat to leave the group is sent out (minimal effort 
to send the
+ * leave group heartbeat, without waiting for any response or considering 
timeouts).
+ */
+public class UnsubscribeApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   Where does it block? I didn't see a call to `Future.get()` when I looked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392802141


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
 /**
  * Assignment that the member received from the server and successfully 
processed.
  */
-private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+private Set currentAssignment;
 
 /**
- * Assignment that the member received from the server but hasn't 
completely processed
- * yet.
+ * Subscription state object holding the current assignment the member has 
for the topics it
+ * subscribed to.
  */
-private Optional 
targetAssignment;
+private final SubscriptionState subscriptions;
+
+/**
+ * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+ */
+private final ConsumerMetadata metadata;
+
+/**
+ * TopicPartition comparator based on topic name and partition id.
+ */
+private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
 /**
  * Logger.
  */
 private final Logger log;
 
-public MembershipManagerImpl(String groupId, LogContext logContext) {
-this(groupId, null, null, logContext);
+/**
+ * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+ * enabled)
+ */
+private final CommitRequestManager commitRequestManager;
+
+/**
+ * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+ * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+ * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+ * requests in cases where a currently assigned topic is in the target 
assignment (new
+ * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+ * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+ * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+ */
+private final Map assignedTopicNamesCache;
+
+/**
+ * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+ * Items are added to this set every time a target assignment is received. 
Items are removed
+ * when metadata is found for the topic. This is where the member collects 
all assignments
+ * received from the broker, even though they may not be ready to 
reconcile due to missing
+ * metadata.
+ */
+private final Map> assignmentUnresolved;
+
+/**
+ * Assignment received for which topic names have been resolved, so it's 
ready to be
+ * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+ * available), or when a metadata update is received. This is where the 
member keeps all the
+ * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+ * is already another on in process.
+ */
+private final SortedSet assignmentReadyToReconcile;
+
+/**
+ * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+ * re-join a group.
+ */
+public static final int JOIN_GROUP_EPOCH = 0;
+
+/**
+ * If there is a reconciliation running (triggering commit, callbacks) for 
the
+ * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+ * after receiving a heartbeat response, or a metadata update.
+ */
+private boolean reconciliationInProgress;
+
+/**
+ * ID the member had when the reconciliation in progress started. This is 
used to identify if
+ * the member has rejoined while it was reconciling an assignment (in 
which case the result
+ * of the reconciliation is not applied.)
+ */
+private String memberIdOnReconciliationStart;
+
+

Review Comment:
   he he, I do avoid this, missed it here, fixed ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392790381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+clearPendingAssignmentsAndLocalNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearPendingAssignmentsAndLocalNamesCache();
+// Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+// reconciliation completes while the member is rejoining but hasn't 
received the new
+// member ID yet, the reconciliation result is discarded.
+memberIdOnReconciliationStart = null;
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
  */
-private boolean maybeTransitionToStable() {
-if (!hasPendingTargetAssignment()) {
-transitionTo(MemberState.STABLE);
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+  

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392748787


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
 public enum Type {
 COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   It's exactly when the user changes the subscription via a call to subscribe. 
I used the name `SUBSCRIPTION_CHANGE` because it seemed clear and to be 
consistent with the existing `ASSIGNMENT_CHANGE`, but let me know if you think 
another name would be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392744058


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,111 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
+ */
+PREPARE_LEAVING,
+
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until
+ * the next heartbeat request is sent out with epoch -1 to effectively 
leave the group. This
+ * state indic

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392732597


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1093,6 +1099,10 @@ private void subscribeInternal(Collection 
topics, Optional(topics), listener))
 metadata.requestUpdateForNewTopics();
+
+// Trigger subscribe event to effectively join the group if not 
already part of it,
+// or just send the new subscription to the broker.
+applicationEventHandler.add(new 
SubscriptionChangeApplicationEvent());

Review Comment:
   I see, I was just intentionally leaving out all the pattern based logic 
because we don't support it at this point. But this makes me realize that that 
`subscribeInternal` based on pattern that you mentioned is wired to the 
`subscribe(Pattern pattern)` API call, when it's truly not supported yet. I 
think we should disable all the subscribe based on patterns until we implement 
them properly. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392733198


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -260,42 +900,52 @@ public Optional serverAssignor() {
  * {@inheritDoc}
  */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
+public Set currentAssignment() {
 return this.currentAssignment;
 }
 
 
 /**
- * @return Assignment that the member received from the server but hasn't 
completely processed
- * yet. Visible for testing.
+ * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
+ * because topic names are not in metadata. Visible for testing.
  */
-Optional targetAssignment() 
{
-return targetAssignment;
+Set topicsWaitingForMetadata() {
+return Collections.unmodifiableSet(assignmentUnresolved.keySet());
 }
 
 /**
- * This indicates that the reconciliation of the target assignment has 
been successfully
- * completed, so it will make it effective by assigning it to the current 
assignment.
- *
- * @params Assignment that has been successfully reconciled. This is 
expected to
- * match the target assignment defined in {@link #targetAssignment()}
+ * @return Topic partitions received in a target assignment that have been 
resolved in
+ * metadata and are ready to be reconciled. Visible for testing.
+ */
+Set assignmentReadyToReconcile() {
+return Collections.unmodifiableSet(assignmentReadyToReconcile);
+}
+
+/**
+ * @return If there is a reconciliation in process now. Note that 
reconciliation is triggered
+ * by a call to {@link #reconcile()}. Visible for testing.
+ */
+boolean reconciliationInProgress() {
+return reconciliationInProgress;
+}
+
+/**
+ * When cluster metadata is updated, try to resolve topic names for topic 
IDs received in
+ * assignment that hasn't been resolved yet.
+ * 
+ * Try to find topic names for all unresolved assignments
+ * Add discovered topic names to the local topic names cache
+ * If any topics are resolved, trigger a reconciliation 
process
+ * If some topics still remain unresolved, request another 
metadata update
+ * 
  */
 @Override
-public void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-if (assignment == null) {
-throw new IllegalArgumentException("Assignment cannot be null");
-}
-if (!assignment.equals(targetAssignment.orElse(null))) {
-// This could be simplified to remove the assignment param and 
just assume that what
-// was reconciled was the targetAssignment, but keeping it 
explicit and failing fast
-// here to uncover any issues in the interaction of the assignment 
processing logic
-// and this.
-throw new IllegalStateException(String.format("Reconciled 
assignment %s does not " +
-"match the expected target assignment %s", 
assignment,
-targetAssignment.orElse(null)));
+public void onUpdate(ClusterResource clusterResource) {
+resolveMetadataForUnresolvedAssignment();
+if (!assignmentReadyToReconcile.isEmpty()) {
+// TODO: improve reconciliation triggering. Initial approach of 
triggering on every
+//  HB response and metadata update.
+reconcile();
 }

Review Comment:
   Good catch, added. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-14 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1392717259


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * Application event triggered when a user calls the unsubscribe API. This 
will make the consumer
+ * release all its assignments and send a heartbeat request to leave the 
consumer group.
+ * This event holds a future that will complete when the invocation of 
callbacks to release
+ * complete and the heartbeat to leave the group is sent out (minimal effort 
to send the
+ * leave group heartbeat, without waiting for any response or considering 
timeouts).
+ */
+public class UnsubscribeApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   The `Consumer.unsubscribe` does block on the callback execution, that's why 
it is a `CompletableApplicationEvent`. Only after the callback completes the 
unsubscribe can send the actual leave group heartbeat request. Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-13 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1391862343


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1093,6 +1099,10 @@ private void subscribeInternal(Collection 
topics, Optional(topics), listener))
 metadata.requestUpdateForNewTopics();
+
+// Trigger subscribe event to effectively join the group if not 
already part of it,
+// or just send the new subscription to the broker.
+applicationEventHandler.add(new 
SubscriptionChangeApplicationEvent());

Review Comment:
   There's another `subscribeInternal()` for the topic pattern path. We want 
this there too, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
 @Override
 public void enforceRebalance() {
-throw new KafkaException("method not implemented");

Review Comment:
   Good call!



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * Application event triggered when a user calls the unsubscribe API. This 
will make the consumer
+ * release all its assignments and send a heartbeat request to leave the 
consumer group.
+ * This event holds a future that will complete when the invocation of 
callbacks to release
+ * complete and the heartbeat to leave the group is sent out (minimal effort 
to send the
+ * leave group heartbeat, without waiting for any response or considering 
timeouts).
+ */
+public class UnsubscribeApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   The intention of the `CompleteableApplicationEvent` was to have a way for 
the consumer to block on the results of operations performed in the background 
thread. Since the `Consumer.unsubscribe()` API call is non-blocking, I'm 
thinking this should be a subclass of `ApplicationEvent`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
 public enum Type {
 COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   ```suggestion
   LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIBED,
   ```
   
   `SUBSCRIPTION_CHANGE` is a bit vague. Does it encompass more than the event 
of the user calling `Consumer.subscribe()`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", membe

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-12 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1390590490


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = LEAVE_GROUP_EPOCH;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+callbackResult = CompletableFuture.completedFuture(null);
+} else {
+// Release assignment
+if (memberEpoch > 0) {
+// Member is part of the group. Invoke onPartitionsRevoked.
+callbackResult = revokePartitions(droppedPartitions);
+} else {
+// Member is not part of the group anymore. Invoke 
onPartitionsLost.
+callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+}
+}
+return callbackResult;
+}
+
+/**
+ * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+ * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-12 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1390583612


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,111 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
+ */
+PREPARE_LEAVING,
+
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until

Review Comment:
   Yes, similar to the ACKNOWLEDGING in the sense that they are just a way to 
indicate that a heartb

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-12 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1390582429


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
  * @param topic to be requested. If empty, return the metadata for all 
topics.
  * @return the future of the metadata request.
  */
-public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {

Review Comment:
   You're right, not needed anymore (all metadata interaction is now based on 
the centralized metadata cache). All removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-10 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1389220863


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
  * @param topic to be requested. If empty, return the metadata for all 
topics.
  * @return the future of the metadata request.
  */
-public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {

Review Comment:
   Do we still need the changes in this class? It seems that we don't use them 
anymore. I have the same question for the new `Topic` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-08 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1387282757


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -17,57 +17,111 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+ * assignments. This will be the state when the member has never 
subscribed, or when it has
+ * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+ * be an active member of the consumer group (no heartbeats sent).
  */
-UNJOINED,
+UNSUBSCRIBED,
+
+/**
+ * Member is attempting to join a consumer group. While in this state, the 
member will send
+ * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+ * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+ * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+ */
+JOINING,
 
 /**
  * Member has received a new target assignment (partitions could have been 
assigned or
- * revoked), and it is processing it. While in this state, the member will
- * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
- * the new assignment effective.
+ * revoked), and it is processing it. While in this state, the member will 
continue to send
+ * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+ * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+ * the new assignment effective). Note that while in this state the member 
may be trying to
+ * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+ * already resolved.
  */
-// TODO: determine if separate state will be needed for assign/revoke (not 
for now)
 RECONCILING,
 
 /**
- * Member is active in a group (heartbeating) and has processed all 
assignments received.
+ * Member has completed reconciling an assignment received, and stays in 
this state only until
+ * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+ * state indicates that the next heartbeat request must be sent without 
waiting for the
+ * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+ * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+ * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+ * received from the broker)
+ */
+ACKNOWLEDGING,
+
+/**
+ * Member is active in a group and has processed all assignments received. 
While in this
+ * state, the member will send heartbeats on the interval.
  */
 STABLE,
 
 /**
- * Member transitions to this state when it receives a
- * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
- * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
- * broker. This is a recoverable state, where the member
- * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+ * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+ * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+ * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+ * transition to {@link #JOINING} to re-join the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+ * member will stop sending heartbeats, will commit offsets if needed and 
release its
+ * assignment (calling user's callback for partitions revoked or lost). 
When all these
+ * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+ * effectively leave the group.
+ */
+PREPARE_LEAVING,
+
+/**
+ * Member has committed offsets and releases its assignment, so it stays 
in this state until

Review Comment:
   This is just a transient state for the member to send the leave group 
heartbeat with epoch -1/-

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-07 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1385350785


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = LEAVE_GROUP_EPOCH;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+callbackResult = CompletableFuture.completedFuture(null);
+} else {
+// Release assignment
+if (memberEpoch > 0) {
+// Member is part of the group. Invoke onPartitionsRevoked.
+callbackResult = revokePartitions(droppedPartitions);
+} else {
+// Member is not part of the group anymore. Invoke 
onPartitionsLost.
+callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+}
+}
+return callbackResult;
+}
+
+/**
+ * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+ * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-07 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383658863


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -166,19 +166,41 @@ private static long findMinTime(final Collection request
 .orElse(Long.MAX_VALUE);
 }
 
-public void maybeAutoCommit(final Map 
offsets) {
+/**
+ * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
+ * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
+ * request if there is no other commit request already in-flight, and if 
the commit interval
+ * has elapsed.
+ *
+ * @param offsets Offsets to commit
+ * @return Future that will complete when a response is received for the 
request, or a
+ * completed future if no request is generated.
+ */
+public CompletableFuture maybeAutoCommit(final Map offsets) {
 if (!autoCommitState.isPresent()) {
-return;
+return CompletableFuture.completedFuture(null);
 }
 
 AutoCommitState autocommit = autoCommitState.get();
 if (!autocommit.canSendAutocommit()) {
-return;
+return CompletableFuture.completedFuture(null);
 }
 
-sendAutoCommit(offsets);
+CompletableFuture result = sendAutoCommit(offsets);
 autocommit.resetTimer();
 autocommit.setInflightCommitStatus(true);
+return result;
+}
+
+/**
+ * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
+ * partitions and their current positions.
+ *
+ * @return Future that will complete when a response is received for the 
request, or a
+ * completed future if no request is generated.
+ */
+public CompletableFuture maybeAutoCommitAllConsumed() {

Review Comment:
   Can we return an optional future here because there's technically nothing to 
be completed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-07 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1385085418


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +262,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = LEAVE_GROUP_EPOCH;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
 }
 
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+callbackResult = CompletableFuture.completedFuture(null);
+} else {
+// Release assignment
+if (memberEpoch > 0) {
+// Member is part of the group. Invoke onPartitionsRevoked.
+callbackResult = revokePartitions(droppedPartitions);
+} else {
+// Member is not part of the group anymore. Invoke 
onPartitionsLost.
+callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+}
+}
+return callbackResult;
+}
+
+/**
+ * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+ * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+ * request is sent out with it.
+ */
+private void transitionToSendingLeaveGroup() {
+memberEpoch = leaveGroupEpoch();
+currentAssignment = new HashSet<>();
+targetAssignment = Optional.empty();
+transit

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-07 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1385082158


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +307,469 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();

Review Comment:
   Agree, done here. Note my answer above though, about the cases where we do 
need revoke or lost, that one should stay. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-07 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1385059304


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   Agree that the unsubscribe blocks on the callbacks, but since we don't have 
the implementation for how callbacks are going to be executed on this PR this 
unsubscribe is still not using it. It should come when we nail the 
implementation details on the follow-up PR (will depend on how we end up doing 
it, maybe blocking not here, but on the subscribe/unsubscribe events)
   
   As for the exception, it will originate in the Application thread, where the 
callback is executed, and it should only be returned to the user when it calls 
poll, to maintain the current behaviour, so I removed it from here to stay 
consistent and leave all logic related to the callback execution out, I see the 
confusion introduced.
   
   The when complete should stay because it represents a concept we need, 
un-related to callbacks: we do need to update the `subscriptionState` only when 
the Unsubscribe event completes (HB to leave group sent to the broker). We need 
it know to make sure we are able to run unsubscribe, no callbacks, sending 
leave group, and clearing up the subscription state after sending the request. 
   
   Trying to leave it in a consistent state with no callbacks. Follow-up PR 
should introduce implementation for executing them, blocking appropriately on 
the execution, and throwing the exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1384076305


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   I think this KafkaException might not be in the right place because the 
rebalance listener needs to be invoked on the mainthread, probably before 
sending out the event.  I wonder if we could just remove this whenComplete and 
rely on the background thread to log the failures during the leave group event. 
 If there's a fatal exception being thrown there, it seems the sensible way is 
to enqueue to the BackgroundEventQueue and handle in the poll.  wdyt?
   
   In the current code path, I think only exceptions can only be thrown in 
`onLeavePrepare`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1384016549


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   I think unsubscribe() actually blocks on callback invocation and throw if 
possible.  Instead of putting the logic in whenComplete, it seems like we 
should try to wait till the callback completes then throw if needed.  I assume 
we want to maintain this behavior for the async consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383998319


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   The current consumer throws KafkaException on unsubscribe()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383998319


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   The current consumer throws KafkaException on unsubscribe()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1384004581


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Topic.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+
+/**
+ * A topic represented by a universally unique identifier and a topic name.
+ */
+public class Topic {
+
+private final Uuid topicId;
+
+private final String topicName;
+
+public Topic(Uuid topicId) {
+this(topicId, null);
+}
+
+public Topic(String topicName) {
+this(null, topicName);
+}
+
+public Topic(Uuid topicId, String topicName) {
+this.topicId = topicId;
+this.topicName = topicName;
+}
+
+public Uuid topicId() {
+return topicId;
+}
+
+public String topicName() {
+return topicName;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+Topic other = (Topic) o;
+return Objects.equals(topicId, other.topicId) && 
Objects.equals(topicName, other.topicName);
+}
+
+@Override
+public int hashCode() {
+final int prime = 31;
+int result = prime + Objects.hashCode(topicId);
+result = prime * result + Objects.hashCode(topicName);
+return result;
+}
+
+@Override
+public String toString() {
+return topicId + ":" + topicName;

Review Comment:
   Thanks for the clarification!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383998319


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   The current consumer throws KafkaException on unsubscribe(), I think 
throwing in the whenComplete doesn't actually throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383995398


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
 @Override
 public void enforceRebalance() {
-throw new KafkaException("method not implemented");
+log.warn("Operation not supported in new consumer group protocol");
 }
 
 @Override
 public void enforceRebalance(String reason) {
-throw new KafkaException("method not implemented");
+log.warn("Operation not supported in new consumer group protocol");

Review Comment:
   Thanks for the clarification!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383986418


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Topic.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+
+/**
+ * A topic represented by a universally unique identifier and a topic name.
+ */
+public class Topic {
+
+private final Uuid topicId;
+
+private final String topicName;
+
+public Topic(Uuid topicId) {
+this(topicId, null);
+}
+
+public Topic(String topicName) {
+this(null, topicName);
+}
+
+public Topic(Uuid topicId, String topicName) {
+this.topicId = topicId;
+this.topicName = topicName;
+}
+
+public Uuid topicId() {
+return topicId;
+}
+
+public String topicName() {
+return topicName;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+Topic other = (Topic) o;
+return Objects.equals(topicId, other.topicId) && 
Objects.equals(topicName, other.topicName);
+}
+
+@Override
+public int hashCode() {
+final int prime = 31;
+int result = prime + Objects.hashCode(topicId);
+result = prime * result + Objects.hashCode(topicName);
+return result;
+}
+
+@Override
+public String toString() {
+return topicId + ":" + topicName;

Review Comment:
   Actually I intentionally followed the standard of the TopicPartition and 
TopicIdPartition toString implementations, since this new Topic class is kind 
of a sibling, makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383983917


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
 @Override
 public void enforceRebalance() {
-throw new KafkaException("method not implemented");
+log.warn("Operation not supported in new consumer group protocol");
 }
 
 @Override
 public void enforceRebalance(String reason) {
-throw new KafkaException("method not implemented");
+log.warn("Operation not supported in new consumer group protocol");

Review Comment:
   The KIP states that we should do exactly this, and actually the consumer 
would stay functional.
   `Consumer#enforceRebalance will be deprecated and will be a no-op if used 
when the new protocol is enable. A warning will be logged in this case.` 
   Do you have a concern that I may be missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383979443


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +307,469 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearAssignedTopicNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+clearAssignedTopicNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearAssignedTopicNamesCache();
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearAssignedTopicNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {

Review Comment:
   The reason is the leave group logic. On leave group it could be lost or 
revoked, depending on the epoch. On fence or fatal is always lost. I was just 
reusing the same func for convenience, but will change the fence and fatal 
transitions to directly invoke the `onPartitionsLost` just to make the 
intention clearer. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383974744


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
 @Override
 public void enforceRebalance() {
-throw new KafkaException("method not implemented");
+log.warn("Operation not supported in new consumer group protocol");
 }
 
 @Override
 public void enforceRebalance(String reason) {
-throw new KafkaException("method not implemented");
+log.warn("Operation not supported in new consumer group protocol");

Review Comment:
   can we just fail the consumer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383968073


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Topic.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.Objects;
+
+/**
+ * A topic represented by a universally unique identifier and a topic name.
+ */
+public class Topic {
+
+private final Uuid topicId;
+
+private final String topicName;
+
+public Topic(Uuid topicId) {
+this(topicId, null);
+}
+
+public Topic(String topicName) {
+this(null, topicName);
+}
+
+public Topic(Uuid topicId, String topicName) {
+this.topicId = topicId;
+this.topicName = topicName;
+}
+
+public Uuid topicId() {
+return topicId;
+}
+
+public String topicName() {
+return topicName;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+Topic other = (Topic) o;
+return Objects.equals(topicId, other.topicId) && 
Objects.equals(topicName, other.topicName);
+}
+
+@Override
+public int hashCode() {
+final int prime = 31;
+int result = prime + Objects.hashCode(topicId);
+result = prime * result + Objects.hashCode(topicName);
+return result;
+}
+
+@Override
+public String toString() {
+return topicId + ":" + topicName;

Review Comment:
   Can we use the standard toString format? Topic(topicId=...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383962388


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   You're right, it should happen in the poll to maintain the current contract. 
I will just remove it since we don't execute callbacks yet, and it should be 
included in the PR that introduces the callback execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383959347


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -149,27 +192,36 @@ public HeartbeatRequestManager(
  */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-if (!coordinatorRequestManager.coordinator().isPresent() || 
!membershipManager.shouldSendHeartbeat())
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat())
 return NetworkClientDelegate.PollResult.EMPTY;
 
-// TODO: We will need to send a heartbeat response after partitions 
being revoke. This needs to be
-//  implemented either with or after the partition reconciliation 
logic.
-if (!heartbeatRequestState.canSendRequest(currentTimeMs))
+boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();

Review Comment:
   Yes, as we discussed for the states that would send heartbeat without 
waiting for the interval (sending ack for an assignment and leave group 
requests basically)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383791886


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -149,27 +192,36 @@ public HeartbeatRequestManager(
  */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-if (!coordinatorRequestManager.coordinator().isPresent() || 
!membershipManager.shouldSendHeartbeat())
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat())
 return NetworkClientDelegate.PollResult.EMPTY;
 
-// TODO: We will need to send a heartbeat response after partitions 
being revoke. This needs to be
-//  implemented either with or after the partition reconciliation 
logic.
-if (!heartbeatRequestState.canSendRequest(currentTimeMs))
+boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();

Review Comment:
   I'm guessing the idea is we need to ignore the backoff and heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383786483


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   Looking at the current consumer, I think it can be quite complicated in this 
implementation.  Maybe what we should do is to invoke the listener on the spot, 
then send an even to the background thread to leave group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383780173


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
 @Override
 public void unsubscribe() {
 fetchBuffer.retainAll(Collections.emptySet());
-subscriptions.unsubscribe();
+UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+applicationEventHandler.add(unsubscribeApplicationEvent);
+unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+if (error != null) {
+// Callback failed - Keeping same exception message thrown by 
the legacy consumer
+throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   I don't think the exception will actually be thrown here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383701313


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +307,469 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();

Review Comment:
   As previously mentioned, it is clearer to directly invoke 
`invokeOnPartitionsLost` here. Is there a case we don't invoke onPartitionsLost?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383699023


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +307,469 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearAssignedTopicNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+clearAssignedTopicNamesCache();
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+clearAssignedTopicNamesCache();
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.PREPARE_LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+clearAssignedTopicNamesCache();
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {

Review Comment:
   is there any reason we want to combine onPartitionsLost and 
onPartitionsRevoke into a single function?  Couldn't we directly invoke 
`invokeOnPartitionsRevoke` or `invokeOnPartitionsLost`? i.e. by the time the 
consumer is fenced or would know we need to invoke onPartitionsLost



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383658863


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -166,19 +166,41 @@ private static long findMinTime(final Collection request
 .orElse(Long.MAX_VALUE);
 }
 
-public void maybeAutoCommit(final Map 
offsets) {
+/**
+ * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
+ * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
+ * request if there is no other commit request already in-flight, and if 
the commit interval
+ * has elapsed.
+ *
+ * @param offsets Offsets to commit
+ * @return Future that will complete when a response is received for the 
request, or a
+ * completed future if no request is generated.
+ */
+public CompletableFuture maybeAutoCommit(final Map offsets) {
 if (!autoCommitState.isPresent()) {
-return;
+return CompletableFuture.completedFuture(null);
 }
 
 AutoCommitState autocommit = autoCommitState.get();
 if (!autocommit.canSendAutocommit()) {
-return;
+return CompletableFuture.completedFuture(null);
 }
 
-sendAutoCommit(offsets);
+CompletableFuture result = sendAutoCommit(offsets);
 autocommit.resetTimer();
 autocommit.setInflightCommitStatus(true);
+return result;
+}
+
+/**
+ * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
+ * partitions and their current positions.
+ *
+ * @return Future that will complete when a response is received for the 
request, or a
+ * completed future if no request is generated.
+ */
+public CompletableFuture maybeAutoCommitAllConsumed() {

Review Comment:
   Can we return an optional future here because there's technically nothing to 
be completed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383623183


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = LEAVE_GROUP_EPOCH;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+transitionTo(MemberState.FATAL);
 }
 
+/**
+ * {@inheritDoc}
+ */
 @Override
-public boolean shouldSendHeartbeat() {
-return state() != MemberState.FAILED;
+public void transitionToJoining() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
 }
 
 /**
- * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
- * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
+}
+
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+callbackResult = CompletableFuture.completedFuture(null);
+} else {
+// Release assignment
+if (memberEpoch > 0) {
+// Member is part of the group. Invoke onPartitionsRevoked.
+callbackResult = revokePartitions(droppedPartitions);
+} else {
+// Member is not part of the group anymore. Invoke 
onPartitionsLost.
+callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+}
+}
+return callbackResult;
+}
+
+/**
+ * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+ * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
th

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383610788


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -149,15 +149,17 @@ public HeartbeatRequestManager(
  */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-if (!coordinatorRequestManager.coordinator().isPresent() || 
!membershipManager.shouldSendHeartbeat())
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat())
 return NetworkClientDelegate.PollResult.EMPTY;
 
-// TODO: We will need to send a heartbeat response after partitions 
being revoke. This needs to be
-//  implemented either with or after the partition reconciliation 
logic.
-if (!heartbeatRequestState.canSendRequest(currentTimeMs))
+boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs) && 
!heartbeatNow) {
 return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
+}
 
-this.heartbeatRequestState.onSendAttempt(currentTimeMs);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();

Review Comment:
   I'm using `send` to align with how the HB manager was naming the existing 
`onSendAttempt` and such, and I think that as seen from the manager point of 
view it is enough, the membership manager only needs to know that the request 
is sent out of the HB manager to transition accordingly. It is not literally 
sent over the network. I would leave send/sent to keep it consistent between 
the 2 managers, but let me know if you think it would be clearer differently. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-06 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383599082


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -73,30 +84,55 @@ public interface MembershipManager {
 /**
  * @return Current assignment for the member.
  */
-ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
+Set currentAssignment();

Review Comment:
   I will share thoughts on this on our next sync with @dajac and you 
@AndrewJSchofield, as this is interesting. Just for the record, I do agree that 
spreading topic IDs is the right way forward, and we do have them now in the 
assignment path, but I realized when exploring this suggestion that it is a 
much bigger change if we move away from `TopicPartition`, and we're not ready 
for it at this point (mainly tricky/ugly because not all paths support topicID 
yet but most of them access/update the shared `subscriptionState` component and 
`TopicPartition`)
   
   At this point looks to me that we're better of making use of the topic IDs 
kind of "on the side", like we're doing now in the `membershipManager`, that 
keeps the assigned topicId/names but still uses the same `TopicPartition`. This 
is the same approach followed by the `Fetch` and `Metadata` paths, that 
introduced topic Ids in a similar "on the side" way.
   
   There's interesting food for thought here anyway.
   
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >