lucasbru commented on code in PR #19121:
URL: https://github.com/apache/kafka/pull/19121#discussion_r1983675477
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final
LogContext logContext,
retryBackoffMaxMs,
maxPollIntervalMs
);
+ this.pollTimer = time.timer(maxPollIntervalMs);
}
+ /**
+ * This will build a heartbeat request if one must be sent, determined
based on the member
+ * state. A heartbeat is sent in the following situations:
Review Comment:
These are not separate situations in which a heartbeat must be sent. Maybe
you mean "when all of the following conditions apply" or soemthign?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final
LogContext logContext,
retryBackoffMaxMs,
maxPollIntervalMs
);
+ this.pollTimer = time.timer(maxPollIntervalMs);
}
+ /**
+ * This will build a heartbeat request if one must be sent, determined
based on the member
+ * state. A heartbeat is sent in the following situations:
+ * <ol>
+ * <li>Member is part of the consumer group or wants to join it.</li>
+ * <li>The heartbeat interval has expired, or the member is in a state
that indicates
+ * that it should heartbeat without waiting for the interval.</li>
+ * </ol>
+ * This will also determine the maximum wait time until the next poll
based on the member's
+ * state.
+ * <ol>
+ * <li>If the member is without a coordinator or is in a failed state,
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+ * <li>If the member cannot send a heartbeat due to either exponential
backoff, it will
+ * return the remaining time left on the backoff timer.</li>
+ * <li>If the member's heartbeat timer has not expired, It will return
the remaining time
+ * left on the heartbeat timer.</li>
+ * <li>If the member can send a heartbeat, the timer is set to the
current heartbeat interval.</li>
+ * </ol>
+ *
+ * @return {@link
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
that includes a
+ * heartbeat request if one must be sent, and the time to wait
until the next poll.
+ */
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- return new NetworkClientDelegate.PollResult(
- heartbeatRequestState.heartbeatIntervalMs(),
- Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
- );
+ if (coordinatorRequestManager.coordinator().isEmpty() ||
membershipManager.shouldSkipHeartbeat()) {
+ membershipManager.onHeartbeatRequestSkipped();
+ maybePropagateCoordinatorFatalErrorEvent();
+ return NetworkClientDelegate.PollResult.EMPTY;
+ }
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+ logger.warn("Consumer poll timeout has expired. This means the
time between " +
+ "subsequent calls to poll() was longer than the configured
max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too
much time processing " +
+ "messages. You can address this either by increasing
max.poll.interval.ms or by " +
+ "reducing the maximum size of batches returned in poll() with
max.poll.records.");
+
+ membershipManager.onPollTimerExpired();
+ NetworkClientDelegate.UnsentRequest leaveHeartbeat =
makeHeartbeatRequestOnlyLogResponse(currentTimeMs);
+
+ // We can ignore the leave response because we can join before or
after receiving the response.
+ heartbeatRequestState.reset();
+ heartbeatState.reset();
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(leaveHeartbeat));
+ }
+ if (shouldHeartbeatBeforeIntervalExpires() ||
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(request));
+ } else {
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+ }
+ }
+
+ /**
+ * A heartbeat should be sent without waiting for the heartbeat interval
to expire if:
+ * - the member is leaving the group
+ * or
+ * - the member is joining the group or acknowledging the assignment and
for both cases there is no heartbeat request
+ * in flight.
+ * @return
+ */
+ private boolean shouldHeartbeatBeforeIntervalExpires() {
+ return membershipManager.state() == MemberState.LEAVING
+ ||
+ (membershipManager.state() == MemberState.JOINING ||
membershipManager.state() == MemberState.ACKNOWLEDGING)
+ && !heartbeatRequestState.requestInFlight();
+ }
+
+ private void maybePropagateCoordinatorFatalErrorEvent() {
+ coordinatorRequestManager.getAndClearFatalError()
+ .ifPresent(fatalError -> backgroundEventHandler.add(new
ErrorEvent(fatalError)));
+ }
+
+ private NetworkClientDelegate.UnsentRequest
makeHeartbeatRequestOnlyLogResponse(final long currentTimeMs) {
+ return makeHeartbeatRequest(currentTimeMs, this::logResponse);
}
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs) {
- NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
+ return makeHeartbeatRequest(currentTimeMs, this::handleResponse);
Review Comment:
Wow, this `addCompletionCallback` is complex, and also seems quite
unncessary. Can I not just do a
`handleResponse(makeHeartbeatRequest(currentTimeMs))` or -even better in my
eyes - implement a method for the lambda inside of `handleResponse` only, and
then do a `makeHeartbeatRequest(currentTimeMs).whenComplete(handleResponse)`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final
LogContext logContext,
retryBackoffMaxMs,
maxPollIntervalMs
);
+ this.pollTimer = time.timer(maxPollIntervalMs);
}
+ /**
+ * This will build a heartbeat request if one must be sent, determined
based on the member
+ * state. A heartbeat is sent in the following situations:
+ * <ol>
+ * <li>Member is part of the consumer group or wants to join it.</li>
+ * <li>The heartbeat interval has expired, or the member is in a state
that indicates
+ * that it should heartbeat without waiting for the interval.</li>
+ * </ol>
+ * This will also determine the maximum wait time until the next poll
based on the member's
+ * state.
+ * <ol>
+ * <li>If the member is without a coordinator or is in a failed state,
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+ * <li>If the member cannot send a heartbeat due to either exponential
backoff, it will
+ * return the remaining time left on the backoff timer.</li>
+ * <li>If the member's heartbeat timer has not expired, It will return
the remaining time
+ * left on the heartbeat timer.</li>
+ * <li>If the member can send a heartbeat, the timer is set to the
current heartbeat interval.</li>
+ * </ol>
+ *
+ * @return {@link
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
that includes a
+ * heartbeat request if one must be sent, and the time to wait
until the next poll.
+ */
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- return new NetworkClientDelegate.PollResult(
- heartbeatRequestState.heartbeatIntervalMs(),
- Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
- );
+ if (coordinatorRequestManager.coordinator().isEmpty() ||
membershipManager.shouldSkipHeartbeat()) {
+ membershipManager.onHeartbeatRequestSkipped();
+ maybePropagateCoordinatorFatalErrorEvent();
+ return NetworkClientDelegate.PollResult.EMPTY;
+ }
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+ logger.warn("Consumer poll timeout has expired. This means the
time between " +
+ "subsequent calls to poll() was longer than the configured
max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too
much time processing " +
+ "messages. You can address this either by increasing
max.poll.interval.ms or by " +
+ "reducing the maximum size of batches returned in poll() with
max.poll.records.");
+
+ membershipManager.onPollTimerExpired();
+ NetworkClientDelegate.UnsentRequest leaveHeartbeat =
makeHeartbeatRequestOnlyLogResponse(currentTimeMs);
+
+ // We can ignore the leave response because we can join before or
after receiving the response.
+ heartbeatRequestState.reset();
+ heartbeatState.reset();
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(leaveHeartbeat));
+ }
+ if (shouldHeartbeatBeforeIntervalExpires() ||
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(request));
+ } else {
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+ }
+ }
+
+ /**
+ * A heartbeat should be sent without waiting for the heartbeat interval
to expire if:
+ * - the member is leaving the group
+ * or
+ * - the member is joining the group or acknowledging the assignment and
for both cases there is no heartbeat request
+ * in flight.
+ * @return
+ */
+ private boolean shouldHeartbeatBeforeIntervalExpires() {
+ return membershipManager.state() == MemberState.LEAVING
+ ||
+ (membershipManager.state() == MemberState.JOINING ||
membershipManager.state() == MemberState.ACKNOWLEDGING)
+ && !heartbeatRequestState.requestInFlight();
+ }
+
+ private void maybePropagateCoordinatorFatalErrorEvent() {
+ coordinatorRequestManager.getAndClearFatalError()
+ .ifPresent(fatalError -> backgroundEventHandler.add(new
ErrorEvent(fatalError)));
+ }
+
+ private NetworkClientDelegate.UnsentRequest
makeHeartbeatRequestOnlyLogResponse(final long currentTimeMs) {
+ return makeHeartbeatRequest(currentTimeMs, this::logResponse);
}
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs) {
- NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
+ return makeHeartbeatRequest(currentTimeMs, this::handleResponse);
+ }
+
+ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs,
+ final
Function<NetworkClientDelegate.UnsentRequest,
NetworkClientDelegate.UnsentRequest> addCompletionCallback) {
+ NetworkClientDelegate.UnsentRequest request =
addCompletionCallback.apply(new NetworkClientDelegate.UnsentRequest(
new
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
coordinatorRequestManager.coordinator()
- );
+ ));
+ heartbeatRequestState.onSendAttempt(currentTimeMs);
+ membershipManager.onHeartbeatRequestGenerated();
+ metricsManager.recordHeartbeatSentMs(currentTimeMs);
+ heartbeatRequestState.resetTimer();
+ return request;
+ }
+
+ private NetworkClientDelegate.UnsentRequest handleResponse(final
NetworkClientDelegate.UnsentRequest request) {
request.whenComplete((response, exception) -> {
long completionTimeMs = request.handler().completionTimeMs();
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
onResponse((StreamsGroupHeartbeatResponse)
response.responseBody(), completionTimeMs);
}
});
- heartbeatRequestState.onSendAttempt(currentTimeMs);
- membershipManager.onHeartbeatRequestGenerated();
- metricsManager.recordHeartbeatSentMs(currentTimeMs);
return request;
}
+ private NetworkClientDelegate.UnsentRequest logResponse(final
NetworkClientDelegate.UnsentRequest request) {
+ return request.whenComplete((response, exception) -> {
+ if (response != null) {
+
metricsManager.recordRequestLatency(response.requestLatencyMs());
+ Errors error = Errors.forCode(((StreamsGroupHeartbeatResponse)
response.responseBody()).data().errorCode());
+ if (error == Errors.NONE)
+ logger.debug("StreamsGroupHeartbeatRequest responded
successfully: {}", response);
+ else
+ logger.error("StreamsGroupHeartbeatRequest failed because
of {}: {}", error, response);
+ } else {
+ logger.error("StreamsGroupHeartbeatRequest failed because of
unexpected exception.", exception);
+ }
+ });
+ }
+
Review Comment:
```suggestion
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final
LogContext logContext,
retryBackoffMaxMs,
maxPollIntervalMs
);
+ this.pollTimer = time.timer(maxPollIntervalMs);
}
+ /**
+ * This will build a heartbeat request if one must be sent, determined
based on the member
+ * state. A heartbeat is sent in the following situations:
+ * <ol>
+ * <li>Member is part of the consumer group or wants to join it.</li>
+ * <li>The heartbeat interval has expired, or the member is in a state
that indicates
+ * that it should heartbeat without waiting for the interval.</li>
+ * </ol>
+ * This will also determine the maximum wait time until the next poll
based on the member's
+ * state.
+ * <ol>
+ * <li>If the member is without a coordinator or is in a failed state,
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+ * <li>If the member cannot send a heartbeat due to either exponential
backoff, it will
+ * return the remaining time left on the backoff timer.</li>
+ * <li>If the member's heartbeat timer has not expired, It will return
the remaining time
+ * left on the heartbeat timer.</li>
+ * <li>If the member can send a heartbeat, the timer is set to the
current heartbeat interval.</li>
+ * </ol>
+ *
+ * @return {@link
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
that includes a
+ * heartbeat request if one must be sent, and the time to wait
until the next poll.
+ */
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- return new NetworkClientDelegate.PollResult(
- heartbeatRequestState.heartbeatIntervalMs(),
- Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
- );
+ if (coordinatorRequestManager.coordinator().isEmpty() ||
membershipManager.shouldSkipHeartbeat()) {
+ membershipManager.onHeartbeatRequestSkipped();
+ maybePropagateCoordinatorFatalErrorEvent();
+ return NetworkClientDelegate.PollResult.EMPTY;
+ }
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+ logger.warn("Consumer poll timeout has expired. This means the
time between " +
+ "subsequent calls to poll() was longer than the configured
max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too
much time processing " +
+ "messages. You can address this either by increasing
max.poll.interval.ms or by " +
+ "reducing the maximum size of batches returned in poll() with
max.poll.records.");
+
+ membershipManager.onPollTimerExpired();
+ NetworkClientDelegate.UnsentRequest leaveHeartbeat =
makeHeartbeatRequestOnlyLogResponse(currentTimeMs);
+
+ // We can ignore the leave response because we can join before or
after receiving the response.
+ heartbeatRequestState.reset();
+ heartbeatState.reset();
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(leaveHeartbeat));
+ }
+ if (shouldHeartbeatBeforeIntervalExpires() ||
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(request));
+ } else {
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+ }
+ }
+
+ /**
+ * A heartbeat should be sent without waiting for the heartbeat interval
to expire if:
+ * - the member is leaving the group
+ * or
+ * - the member is joining the group or acknowledging the assignment and
for both cases there is no heartbeat request
+ * in flight.
+ * @return
Review Comment:
something seems to be missing after `@return`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]