zheguang commented on code in PR #19121:
URL: https://github.com/apache/kafka/pull/19121#discussion_r1982385169
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final
LogContext logContext,
retryBackoffMaxMs,
maxPollIntervalMs
);
+ this.pollTimer = time.timer(maxPollIntervalMs);
}
+ /**
+ * This will build a heartbeat request if one must be sent, determined
based on the member
+ * state. A heartbeat is sent in the following situations:
+ * <ol>
+ * <li>Member is part of the consumer group or wants to join it.</li>
+ * <li>The heartbeat interval has expired, or the member is in a state
that indicates
+ * that it should heartbeat without waiting for the interval.</li>
+ * </ol>
+ * This will also determine the maximum wait time until the next poll
based on the member's
+ * state.
+ * <ol>
+ * <li>If the member is without a coordinator or is in a failed state,
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+ * <li>If the member cannot send a heartbeat due to either exponential
backoff, it will
+ * return the remaining time left on the backoff timer.</li>
+ * <li>If the member's heartbeat timer has not expired, It will return
the remaining time
+ * left on the heartbeat timer.</li>
+ * <li>If the member can send a heartbeat, the timer is set to the
current heartbeat interval.</li>
+ * </ol>
+ *
+ * @return {@link
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
that includes a
+ * heartbeat request if one must be sent, and the time to wait
until the next poll.
+ */
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- return new NetworkClientDelegate.PollResult(
- heartbeatRequestState.heartbeatIntervalMs(),
- Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
- );
+ if (coordinatorRequestManager.coordinator().isEmpty() ||
membershipManager.shouldSkipHeartbeat()) {
+ membershipManager.onHeartbeatRequestSkipped();
+ maybePropagateCoordinatorFatalErrorEvent();
+ return NetworkClientDelegate.PollResult.EMPTY;
+ }
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+ logger.warn("Consumer poll timeout has expired. This means the
time between " +
+ "subsequent calls to poll() was longer than the configured
max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too
much time processing " +
+ "messages. You can address this either by increasing
max.poll.interval.ms or by " +
+ "reducing the maximum size of batches returned in poll() with
max.poll.records.");
+
+ membershipManager.onPollTimerExpired();
+ NetworkClientDelegate.UnsentRequest leaveHeartbeat =
makeHeartbeatRequestOnlyLogResponse(currentTimeMs);
+
+ // We can ignore the leave response because we can join before or
after receiving the response.
+ heartbeatRequestState.reset();
+ heartbeatState.reset();
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(leaveHeartbeat));
+ }
+ if (shouldHeartbeatBeforeIntervalExpires() ||
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(request));
+ } else {
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+ }
+ }
+
+ /**
+ * A heartbeat should be sent without waiting for the heartbeat interval
to expire if:
+ * - the member is leaving the group
+ * or
+ * - the member is joining the group or acknowledging the assignment and
for both cases there is no heartbeat request
+ * in flight.
+ * @return
+ */
+ private boolean shouldHeartbeatBeforeIntervalExpires() {
+ return membershipManager.state() == MemberState.LEAVING
+ ||
+ (membershipManager.state() == MemberState.JOINING ||
membershipManager.state() == MemberState.ACKNOWLEDGING)
+ && !heartbeatRequestState.requestInFlight();
Review Comment:
🤔 wondering if the closing parenthesis is off by one clause? By reading the
comment above, it seems this intended instead:
```java
||
(membershipManager.state() == MemberState.JOINING ||
membershipManager.state() == MemberState.ACKNOWLEDGING
&& !heartbeatRequestState.requestInFlight());```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]