This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96196bb03ba KAFKA-18736: Add pollOnClose() and maximumTimeToWait()
(#19233)
96196bb03ba is described below
commit 96196bb03ba921ff0b8b763205424a58b36ecf60
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Mar 25 09:09:13 2025 +0100
KAFKA-18736: Add pollOnClose() and maximumTimeToWait() (#19233)
Adds pollOnClose() and maximumTimeToWait() to the Streams
group heartbeat request manager.
Reviewers: Lucas Brutschy <[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 51 +++++++++
.../StreamsGroupHeartbeatRequestManagerTest.java | 127 +++++++++++++++++++++
2 files changed, 178 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index bf67b953dad..08aa6b6927f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -49,6 +49,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
+
/**
* <p>Manages the request creation and response handling for the Streams group
heartbeat. The class creates a
* heartbeat request using the state stored in the membership manager. The
requests can be retrieved
@@ -374,6 +376,55 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
}
}
+ /**
+ * Generate a heartbeat request to leave the group if the state is still
LEAVING when this is
+ * called to close the consumer.
+ * <p/>
+ * Note that when closing the consumer, even though an event to
Unsubscribe is generated
+ * (triggers callbacks and sends leave group), it could be the case that
the Unsubscribe event
+ * processing does not complete in time and moves on to close the managers
(ex. calls to
+ * close with zero timeout). So we could end up on this pollOnClose with
the member in
+ * {@link MemberState#PREPARE_LEAVING} (ex. app thread did not have the
time to process the
+ * event to execute callbacks), or {@link MemberState#LEAVING} (ex. the
leave request could
+ * not be sent due to coordinator not available at that time). In all
cases, the pollOnClose
+ * will be triggered right before sending the final requests, so we ensure
that we generate
+ * the request to leave if needed.
+ *
+ * @param currentTimeMs The current system time in milliseconds at which
the method was called
+ * @return PollResult containing the request to send
+ */
+ @Override
+ public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
+ if (membershipManager.isLeavingGroup()) {
+ NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequestAndLogResponse(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
List.of(request));
+ }
+ return EMPTY;
+ }
+
+ /**
+ * Returns the delay for which the application thread can safely wait
before it should be responsive
+ * to results from the request managers. For example, the subscription
state can change when heartbeats
+ * are sent, so blocking for longer than the heartbeat interval might mean
the application thread is not
+ * responsive to changes.
+ *
+ * <p>Similarly, we may have to unblock the application thread to send a
`PollApplicationEvent` to make sure
+ * our poll timer will not expire while we are polling.
+ *
+ * <p>In the event that heartbeats are currently being skipped, this still
returns the next heartbeat
+ * delay rather than {@code Long.MAX_VALUE} so that the application thread
remains responsive.
+ */
+ @Override
+ public long maximumTimeToWait(long currentTimeMs) {
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired() ||
+ membershipManager.shouldNotWaitForHeartbeatInterval() &&
!heartbeatRequestState.requestInFlight()) {
+
+ return 0L;
+ }
+ return Math.min(pollTimer.remainingMs() / 2,
heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+ }
+
/**
* A heartbeat should be sent without waiting for the heartbeat interval
to expire if:
* - the member is leaving the group
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 126be01e1f5..dae6958035b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -1382,6 +1383,132 @@ class StreamsGroupHeartbeatRequestManagerTest {
.map(Arguments::of);
}
+ @Test
+ public void testPollOnCloseWhenIsNotLeaving() {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager =
createStreamsGroupHeartbeatRequestManager();
+
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.pollOnClose(time.milliseconds());
+
+ assertEquals(NetworkClientDelegate.PollResult.EMPTY, result);
+ }
+
+ @Test
+ public void testPollOnCloseWhenIsLeaving() {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager =
createStreamsGroupHeartbeatRequestManager();
+ when(membershipManager.isLeavingGroup()).thenReturn(true);
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_MEMBER_EPOCH);
+
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.pollOnClose(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ final NetworkClientDelegate.UnsentRequest networkRequest =
result.unsentRequests.get(0);
+ StreamsGroupHeartbeatRequest streamsRequest =
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
+ assertEquals(GROUP_ID, streamsRequest.data().groupId());
+ assertEquals(MEMBER_ID, streamsRequest.data().memberId());
+ assertEquals(LEAVE_GROUP_MEMBER_EPOCH,
streamsRequest.data().memberEpoch());
+ }
+
+ @Test
+ public void testMaximumTimeToWaitPollTimerExpired() {
+ try (
+ final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class, (mock, context) -> {
+ when(mock.isExpired()).thenReturn(true);
+ });
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+ when(mock.requestInFlight()).thenReturn(false);
+ })
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
timerMockedConstruction.constructed().get(0);
+ time.sleep(1234);
+
+ final long maximumTimeToWait =
heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
+
+ assertEquals(0, maximumTimeToWait);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @Test
+ public void testMaximumTimeToWaitWhenHeartbeatShouldBeSentImmediately() {
+ try (
+ final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class);
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+ when(mock.requestInFlight()).thenReturn(false);
+ })
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
timerMockedConstruction.constructed().get(0);
+
when(membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn(true);
+ time.sleep(1234);
+
+ final long maximumTimeToWait =
heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
+
+ assertEquals(0, maximumTimeToWait);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @ParameterizedTest
+ @CsvSource({"true, false", "false, false", "true, true"})
+ public void
testMaximumTimeToWaitWhenHeartbeatShouldBeNotSentImmediately(final boolean
isRequestInFlight,
+
final boolean shouldNotWaitForHeartbeatInterval) {
+ final long remainingMs = 12L;
+ final long timeToNextHeartbeatMs = 6L;
+ try (
+ final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class, (mock, context) -> {
+ when(mock.remainingMs()).thenReturn(remainingMs);
+ });
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+ when(mock.requestInFlight()).thenReturn(isRequestInFlight);
+
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
+ })
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
timerMockedConstruction.constructed().get(0);
+
when(membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn(shouldNotWaitForHeartbeatInterval);
+ time.sleep(1234);
+
+ final long maximumTimeToWait =
heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
+
+ assertEquals(timeToNextHeartbeatMs, maximumTimeToWait);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
+ @ParameterizedTest
+ @CsvSource({"12, 5", "10, 6"})
+ public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long
remainingMs,
+ final long
timeToNextHeartbeatMs) {
+ try (
+ final MockedConstruction<Timer> timerMockedConstruction =
mockConstruction(Timer.class, (mock, context) -> {
+ when(mock.remainingMs()).thenReturn(remainingMs);
+ });
+ final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) -> {
+
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
+ })
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+ final Timer pollTimer =
timerMockedConstruction.constructed().get(0);
+ time.sleep(1234);
+
+ final long maximumTimeToWait =
heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
+
+ assertEquals(5, maximumTimeToWait);
+ verify(pollTimer).update(time.milliseconds());
+ }
+ }
+
private static ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);