lucasbru commented on code in PR #19233:
URL: https://github.com/apache/kafka/pull/19233#discussion_r2009935567
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -366,6 +375,53 @@ public NetworkClientDelegate.PollResult poll(long
currentTimeMs) {
}
}
+ /**
+ * Generate a heartbeat request to leave the group if the state is still
LEAVING when this is
+ * called to close the consumer.
+ * <p/>
+ * Note that when closing the consumer, even though an event to
Unsubscribe is generated
+ * (triggers callbacks and sends leave group), it could be the case that
the Unsubscribe event
+ * processing does not complete in time and moves on to close the managers
(ex. calls to
+ * close with zero timeout). So we could end up on this pollOnClose with
the member in
+ * {@link MemberState#PREPARE_LEAVING} (ex. app thread did not have the
time to process the
+ * event to execute callbacks), or {@link MemberState#LEAVING} (ex. the
leave request could
+ * not be sent due to coordinator not available at that time). In all
cases, the pollOnClose
+ * will be triggered right before sending the final requests, so we ensure
that we generate
+ * the request to leave if needed.
+ *
+ * @param currentTimeMs The current system time in milliseconds at which
the method was called
+ * @return PollResult containing the request to send
+ */
+ @Override
+ public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
+ if (membershipManager.isLeavingGroup()) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequestAndLogResponse(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
List.of(request));
+ }
+ return EMPTY;
+ }
+
+ /**
+ * Returns the delay for which the application thread can safely wait
before it should be responsive
+ * to results from the request managers. For example, the subscription
state can change when heartbeats
+ * are sent, so blocking for longer than the heartbeat interval might mean
the application thread is not
+ * responsive to changes.
+ *
+ * <p>Similarly, we may have to unblock the application thread to send a
`PollApplicationEvent` to make sure
+ * our poll timer will not expire while we are polling.
+ *
+ * <p>In the event that heartbeats are currently being skipped, this still
returns the next heartbeat
+ * delay rather than {@code Long.MAX_VALUE} so that the application thread
remains responsive.
+ */
+ @Override
+ public long maximumTimeToWait(long currentTimeMs) {
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired() ||
(membershipManager.shouldNotWaitForHeartbeatInterval() &&
!heartbeatRequestState.requestInFlight())) {
Review Comment:
unnecessary parantheses
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]