Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru merged PR #14873: URL: https://github.com/apache/kafka/pull/14873 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1851574858 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1423372314 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -948,6 +948,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // 1 consumer using range assignment this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "range-group") this.consumerConfig.setProperty(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range") + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "3") Review Comment: the test is pretty flaky with the default 6s so I set it to a higher value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1423147503 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: OK, thank you for the clarification! Sorry, but I still have two comments: 1. Why to we still need to call `onResponse()`? As far as I understand, all what we do there is not strictly needed because we will do the same when we restart heartbeating in the next poll, won't we? Is the error handling in this case so important? Wouldn't it be equally fine to not handle errors until we start heartbeating? I am asking because the code and probably also the reasoning about the code would become simpler if we just completely ignored the response. I also see that it is not a big deal doing it this or the other way. So, if you do not want to change it, it is fine with me. However, I just realized that with this change we will call `membershipManager.onHeartbeatResponseReceived()` and `maybeSendGroupMetadataUpdateEvent()` in the error case, won't we? Before the change these two calls are only done without an error. Is this acceptable? 2. I understand that the commit gets rejected by the brokers. My question was whether we should avoid sending a commit request in the first place, since the consumer actually knows that it is not part of a group anymore. A last question: > That being said, following commit requests issued after the consumer left the group will succeed because they will be sent without member ID or epoch (noticing that the member is not part of the group anymore). I am not sure I understand this. Why does the commit succeed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1423103409 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: thanks for the clarification @lianetm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850852690 Per request: https://issues.apache.org/jira/browse/KAFKA-15993 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850844332 Hey @lianetm - You are right. It uses rebalance callback in the test so it will need to be enabled after implementing the rebalance callback. However, I did verify the leave group request by setting a break point. Is there a jira for the rebalance callback invocation? I assume it should part of the task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1423084344 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: To complete the pic regarding the commits, if a commit was sent out right before leaving, it will include memberId and epoch that are not valid anymore, so the broker will reject as @philipnee mentioned, sending an UNKNOWN_MEMBER_ID, and the client will fail the commit with that unrecoverable error. That being said, following commit requests issued after the consumer left the group will succeed because they will be sent without member ID or epoch (noticing that the member is not part of the group anymore). That logic is in the commitRequestManager but sharing here to connect the dots. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850787095 @philipnee thanks for the updates! One last comment. Important follow-up to this PR is to enable the specific max poll related integration tests (like `testMaxPollIntervalMs`), that are currently disabled because of the dependency on the callbacks. Is there a jira for that so we make sure we run them asap? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850740142 @philipnee thanks for the updates. @cadonna I'll wait for you green light this by approving. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850712115 Hi @lianetm and @lucasbru - Per your request, I've just ran the tests using ``` Arguments.of("kraft+kip848", "consumer")) ``` All tests passed except `testRemoteAssignorRange`. Per lucas suggestion, I extended the max.poll.interval.ms to 30s, the test passed. ``` Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testCoordinatorFailover(String, String) > testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testCoordinatorFailover(String, String) > testCoordinatorFailover(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testCoordinatorFailover(String, String) > testCoordinatorFailover(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testClusterResourceListener(String, String) > testClusterResourceListener(String, String).quorum=kraft+kip848.groupProtocol=consumer PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testSimpleConsumption(String, String) > testSimpleConsumption(String, String).quorum=kraft+kip848.groupProtocol=consumer PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > testPartitionsForAutoCreate(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > testPartitionsForAutoCreate(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > testPartitionsForAutoCreate(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > testShrinkingTopicSubscriptions(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > testShrinkingTopicSubscriptions(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > testShrinkingTopicSubscriptions(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > testMaxPollIntervalMs(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > testMaxPollIntervalMs(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > testMaxPollIntervalMs(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testAssignAndConsumeFromCommittedOffsets(String, String) > testAssignAndConsumeFromCommittedOffsets(String, String).quorum=kraft+kip848.groupProtocol=consumer PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testSubsequentPatternSubscription(String, String) > testSubsequentPatternSubscription(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testSubsequentPatternSubscription(String, String) > testSubsequentPatternSubscription(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testSubsequentPatternSubscription(String, String) > testSubsequentPatternSubscription(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testPerPartitionLeadMetricsCleanUpWithAssign(String, String) > testPerPartitionLeadMetricsCleanUpWithAssign(String, String).quorum=kraft+kip848.groupProtocol=consumer PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > PlaintextConsumerTest > testNullGroupIdNotSupportedIfCommitting(String, String) >
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850676664 hi @cadonna - Sorry about unintentionally ignoring your question. I've just responded to it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1422983344 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: For calling onResponse: We need to try our best to handle the error if there is one. However, when leaving a group, we don't need to process the `onHeartbeatResponseReceived` because we don't need to reconcile. After leaving the group - The request manager should not keep heart beating. For this specific PR, when the poll timer expires, it will return an empty response when the request manager gets polled. See the first if block in poll(). And you are right, when the user poll the consumer, it resets the timer, and on the next request manager poll (they happen asynchronously), it should send a heartbeat to rejoin. i think if we commit after leaving the group - I think the commit will get rejected with an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1849541992 > @lianetm / @cadonna any more comments on this? @lucasbru I did not get any answer to the following comment: https://github.com/apache/kafka/pull/14873#discussion_r1418729793 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1422075316 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: @philipnee Any thoughts about my comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1849099041 RemoteRangeAssignorTest is still failing for this one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1848609666 Thanks for the changes @philipnee , LGTM. I would only +1 on @lucasbru suggestion of locally running the existing `PlainTextConsumer` integration test on this PR, as it is currently disabled in the CI. Also trying to get some of the max poll related ones to run (OK if in different PR, as I expect it will require some changes due to the listeners they use that we don't support yet). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1847607910 @lianetm - Thanks for the feedback and I've made some changes according to your comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1420828282 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() { return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } +/** + * {@inheritDoc} + */ +@Override +public void transitionToStaled() { +transitionTo(MemberState.PREPARE_LEAVING); +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +transitionTo(MemberState.LEAVING); +leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); +transitionTo(MemberState.STALED); Review Comment: nvm - i was confused by the pending assignments. I think we need to clear the current assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1420817920 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() { return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } +/** + * {@inheritDoc} + */ +@Override +public void transitionToStaled() { +transitionTo(MemberState.PREPARE_LEAVING); +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +transitionTo(MemberState.LEAVING); +leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); +transitionTo(MemberState.STALED); Review Comment: I think transitionToJoining, which takes place when sending out the heartbeat, already clears the current assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1420804236 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() { return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } +/** + * {@inheritDoc} + */ +@Override +public void transitionToStaled() { +transitionTo(MemberState.PREPARE_LEAVING); +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +transitionTo(MemberState.LEAVING); +leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); +transitionTo(MemberState.STALED); Review Comment: thanks that makes a lot of sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1420598917 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() { return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } +/** + * {@inheritDoc} + */ +@Override +public void transitionToStaled() { +transitionTo(MemberState.PREPARE_LEAVING); +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +transitionTo(MemberState.LEAVING); +leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); +transitionTo(MemberState.STALED); Review Comment: We also need to make sure that when going down this path, we clear the current assignment, so the member actually rejoins with its subscription, but ready to take on whatever new partitions the broker assigns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1420570473 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() { return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } +/** + * {@inheritDoc} + */ +@Override +public void transitionToStaled() { +transitionTo(MemberState.PREPARE_LEAVING); Review Comment: Looks to me this is not really needed here? We use this state when we are taking some action before sending the last HB (mainly callbacks), but in this case there is no async operation to do, just send the last HB right? which is explicitly done in the HB mgr when checking that the poll timer expired -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1420576356 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() { return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } +/** + * {@inheritDoc} + */ +@Override +public void transitionToStaled() { +transitionTo(MemberState.PREPARE_LEAVING); +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +transitionTo(MemberState.LEAVING); Review Comment: Similar, do we need this? We use this LEAVING state to signal the HB manager that it should send a HB now, but in this staled member case, that last HB is sent based on the check that the poll timer expired (not based on any state that should HB now) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1420570473 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() { return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } +/** + * {@inheritDoc} + */ +@Override +public void transitionToStaled() { +transitionTo(MemberState.PREPARE_LEAVING); Review Comment: Looks to me this is not really needed here? We use this state when we are taking some action before sending the last HB (mainly callbacks), but in this case there is no async operation to do, just send the last HB right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1847153287 More generally, can you enable the integration tests for this PR? Can you also enable the `testMaxPollIntervalMs`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1846962550 There is a suspicious test failure: https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=Europe%2FVienna=kafka.api.PlaintextConsumerTest=testRemoteAssignorRange(String%2C%20String)%5B1%5D I doesn't seem to fail on master at all, but it fails consistently for this PR. It failed on one other PR once. We probably have to look at this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1845782589 There's quite a bit of failing tests, so i rebased to try to resolve them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1418729793 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: If the heartbeat request manager ignores the response, why does it call `onResponse()`? Does the heartbeat request manager continue to send heartbeats although the consumer left the group? If yes, what is the reason for it? I would have expected that the consumer leaves the group and stops heartbeating. Then at the next call to `Consumer#poll()` it should join the group again. Another question, how does the consumer avoid that offsets are committed (or any other group related request) when the poll timer expired and the consumer is in the staled state? Probably the broker will reject the commit, but actually the consumer proactively left the group, so it knows that it is not part of the group anymore and it could just not send the offset commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1418729793 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: If the heartbeat request manager ignores the response, why does it call `onResponse()`? Does the heartbeat request manager continue to send heartbeats although the consumer left the group? If yes, what is the reason for it? Another question, how does the consumer avoid that offsets are committed when the poll timer expired and the consumer is in the staled state? Probably the broker will reject the commit, but actually the consumer proactively left the group, so it knows that it is not part of the group anymore and it could just not send the offset commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1843583099 Hi @lianetm @cadonna @lucasbru - Thanks for taking the time reviewing my PR. I addressed your comments and left one for @cadonna. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1417898446 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); Review Comment: you are 100% correct. I submit a patch for this specific test. Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1417795477 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { +/** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ +public void resetPollTimer() { +pollTimer.reset(maxPollIntervalMs); +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); +return request; +} + +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), -coordinatorRequestManager.coordinator()); +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), +coordinatorRequestManager.coordinator()); return request.whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); +// The response is only ignore when the member becomes staled. This is because the member needs to +// rejoin on the next poll regardless the server has responded to the heartbeat. +if (!ignoreResponse) { + membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) response.responseBody()).data()); +maybeSendGroupMetadataUpdateEvent(); Review Comment: @cadonna - can you comment on this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1843296141 @philipnee agree on leaving the membership manager determine the epoch to use to leave group. That's already in the [state machine](https://github.com/apache/kafka/blob/adcbdaca72a3484d729e79061b947bcb9c71533a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L568) PR that's on review but not merged yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416992430 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } +/** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ +public void ack() { +pollTimer.reset(rebalanceTimeoutMs); +} + +Timer pollTimer() { Review Comment: I was assuming that after passing the poll timer from the outside, we don't need to make this visible for testing anymore ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -137,4 +142,15 @@ public interface MembershipManager { * Note that list of topics of the subscription is taken from the shared subscription state. */ void onSubscriptionUpdated(); + +/** + * Transition to the {@link MemberState#JOINING} state to attempt joining a group. + */ +void transitionToJoining(); + +/** + * When the user stops polling the consumer, the member will be transitioned to LEAVING without revoking the Review Comment: How does the comment about `LEAVING` relate to this method? Maybe needs to be updated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416983368 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: Oh, I was not aware of the static membership handling in KIP-848. I should have also looked into that instead of only looking into the legacy consumer. Thanks for the clarification! Yes, I agree that consumers that use group management should always send a leave group as KIP-848 describes. So the comment is wrong. I would propose to just remove the whole comment and give the timer a descriptive name. In general I find inline comments problematic because -- at some point -- they start to lie because they are never properly maintained. Instead I prefer to use meaningful and descriptive names and to extract code that need a comment to methods that either have a meaningful name or a comment on the method, preferring the meaningful name. What I just wrote applies in most cases but not in all, but it definitely applies here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416983368 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: Oh, I was not aware of the static membership handling in KIP-848. Should have also looked into that instead of only looking at the legacy consumer. Thanks for the clarification! Yes, I agree that consumers that use group management should always send a leave group as KIP-848 describes. So the comment is wrong. I would propose to just remove the whole comment and give the timer a descriptive name. In general I find inline comments problematic because -- at some point -- they start to lie because they are never properly maintained. Instead I prefer to use meaningful and descriptive names and to extract code that need a comment to methods that either have a meaningful name or a comment on the method, preferring the meaningful name. What I just wrote applies in most cases but not in all, but it definitely applies here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416983368 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: Oh, I was not aware of the static membership handling in KIP-848. Thanks for the clarification! Yes, I agree that consumers that use group management should always send a leave group as KIP-848 describes. So the comment is wrong. I would propose to just remove the whole comment and give the timer a descriptive name. In general I find inline comments problematic because -- at some point -- they start to lie because they are never properly maintained. Instead I prefer to use meaningful and descriptive names and to extract code that need a comment to methods that either have a meaningful name or a comment on the method, preferring the meaningful name. What I just wrote applies in most cases but not in all, but it definitely applies here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842164667 Hi @lianetm - Thanks for the time reviewing the PR. Instead of using boolean flags, I added a _STALED_ state to address your concern. I think we do need this state to stay consistent with the state logic, i.e., I agree that unsubscribed is probably not the right state to be in as it hints to the user that it needs to subscribe. Let me know what do you think. For the static member - I think the right thing to do is to let the membershipManager to decide the right epoch to send. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842032321 Thanks for the changes @philipnee, left answers for the static membership and some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416614219 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); Review Comment: I know we've been all writing this kind of unit tests that go into the integration test land, but if we want to start correcting/simplifying that, this unit test could be simplified. As it is for the HB manager, it should just `verify` the call to the membershipManager func when the timer expires (and then another test in the MembershipMgr for that onStale func, that is needed anyways I would say) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416609139 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,51 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); +time.sleep(1); +// Sending first heartbeat and transitioning to stable +assertHeartbeat(heartbeatRequestManager); +assertFalse(heartbeatRequestManager.pollTimer().isExpired()); +// Expires the poll timer, ensure sending a leave group +time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); +assertLeaveGroup(heartbeatRequestManager); +assertTrue(heartbeatRequestManager.pollTimer().isExpired()); +// Poll again, ensure we heartbeat again. +time.sleep(1); +heartbeatRequestManager.resetPollTimer(); +assertHeartbeat(heartbeatRequestManager); +assertFalse(heartbeatRequestManager.pollTimer().isExpired()); +} + +private void assertHeartbeat(HeartbeatRequestManager hrm) { +System.out.println("assertHeartbeat"); +NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); +assertEquals(1, pollResult.unsentRequests.size()); +assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, pollResult.timeUntilNextPollMs); + pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0), +Errors.NONE)); +} + +private void assertLeaveGroup(HeartbeatRequestManager hrm) { +NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); +assertEquals(1, pollResult.unsentRequests.size()); +ConsumerGroupHeartbeatRequestData data = (ConsumerGroupHeartbeatRequestData) pollResult.unsentRequests.get(0).requestBuilder().build().data(); +assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, data.memberEpoch()); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); Review Comment: Is this really the right end state when the poll timer expires? This means that the user will have to call `subscribe` to join the group again. Is that what the old coordinator requires? (I expected that the old code just requires the consumer to be polled again to join the group. If my understanding is right then we're missing logic after sending the last HB to leave: on timer expiration we should send the last HB and transitionToJoining so that the member re-joins the group on the next poll) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416605360 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: My take is that we should always send the leave group when the poll timer expires (group.instance.id null or not). Dynamic members should send -1 and static -2 as described in the KIP. The [documentation of the max.poll.interval](https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms) is just describing what will happen on the broker side (if static member leaving, broker won't re-assign the partitions until the session timeout expires. If dynamic member, it will re-assign them right away). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1841786386 Hi @lianetm @cadonna @lucasbru - Addressed your comments. I've left a question around sending leave group for static member. From reading the KIP, seems like we should do it. I think we need further clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416388581 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: I think you meant : only when `group.instance.id` is not set, i.e. dynamic member? This is the current implementation ``` // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, // and the membership expiration is only controlled by session timeout. if (isDynamicMember() && !coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId()) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( rebalanceConfig.groupId, Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason))) ); future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation)); client.pollNoWakeup(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416383477 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: well, i'm not sure actually. According to KIP-848: `Static membership, introduced in KIP-345, is still supported by this new rebalance protocol. When a member wants to leave temporary – e.g. while being bounced – it should send an heartbeat with a member epoch equals to -2. This signals to the group coordinator that the member left but will rejoin within the session timeout. When the member rejoins with the same instance ID, the group coordinator replaces the old member by the new member and gives back its current assignment.` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: well, i'm not sure actually. According to KIP-848: `Static membership, introduced in KIP-345, is still supported by this new rebalance protocol. When a member wants to leave temporary – e.g. while being bounced – it should send an heartbeat with a member epoch equals to -2. This signals to the group coordinator that the member left but will rejoin within the session timeout. When the member rejoins with the same instance ID, the group coordinator replaces the old member by the new member and gives back its current assignment.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416380771 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); Review Comment: it is not possible to transition to fenced from unsubscribed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416378676 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } +/** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ +public void ack() { Review Comment: The code is updated - but the pollTimer is updated on a regular cadence by the HeartbeatRequestManager.poll(). this func is used to check if the timer has been expired. If it was, then we reset the timer everytime we poll the consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1415377837 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -121,11 +127,13 @@ public HeartbeatRequestManager( this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs); this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs); +this.pollTimer = time.timer(rebalanceTimeoutMs); } // Visible for testing HeartbeatRequestManager( final LogContext logContext, +final Time time, Review Comment: Why not passing in directly the poll timer instead of the mock time that is only used to create the poll timer. By passing in the poll timer, you can also remove method `pollTimer()` since that is only needed for testing. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -121,11 +127,13 @@ public HeartbeatRequestManager( this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs); this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs); +this.pollTimer = time.timer(rebalanceTimeoutMs); Review Comment: Yes, please, rename. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +// Sending first heartbeat and transitioning to stable +assertHeartbeat(heartbeatRequestManager); +assertFalse(heartbeatRequestManager.pollTimer().isExpired()); +// Expires the poll timer, and ensure heartbeat is not sent +time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); +assertNoHeartbeat(heartbeatRequestManager); Review Comment: As I wrote before, I think the consumer should leave the group. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } +/** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ +public void ack() { +pollTimer.reset(rebalanceTimeoutMs); +} + +Timer pollTimer() { Review Comment: See my comment above about directly passing the poll timer into the constructor. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -166,11 +175,23 @@ public HeartbeatRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) { +if (!coordinatorRequestManager.coordinator().isPresent() || +membershipManager.shouldSkipHeartbeat() || +pollTimer.isExpired()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1414217500 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -166,11 +175,23 @@ public HeartbeatRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) { +if (!coordinatorRequestManager.coordinator().isPresent() || +membershipManager.shouldSkipHeartbeat() || +pollTimer.isExpired()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +return PollResult.EMPTY; Review Comment: I'm thinking what about instead of returning the empty here, we transition the member to "leaving" (`transitionToSendingLeaveGroup`), and that's it. On the following lines that should make the shouldHbNow true, we would send the last HB and transition to unsubscribed (effectively leaving without triggering the callbacks, which is what the old logic does) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1414217500 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -166,11 +175,23 @@ public HeartbeatRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) { +if (!coordinatorRequestManager.coordinator().isPresent() || +membershipManager.shouldSkipHeartbeat() || +pollTimer.isExpired()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +return PollResult.EMPTY; Review Comment: I'm thinking what about instead of returning the empty here, we transition the member to "leaving", and that's it. On the following lines that should make the shouldHbNow true, we would send the last HB and transition to unsubscribed (effectively leaving without triggering the callbacks, which is what the old logic does) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lucasbru commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1414200098 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } +/** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ +public void ack() { Review Comment: Is the timer up to date at this point? Otherwise, `deadlineMs` will be set incorrectly. Maybe pass in the current time into this function and update the timer before setting the timeout. Like we do for auto-commit. Also, better name please. `resetPollTimer` ? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -121,11 +127,13 @@ public HeartbeatRequestManager( this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs); this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs); +this.pollTimer = time.timer(rebalanceTimeoutMs); Review Comment: rebalanceTimeoutMs == maxPollIntervalMs ? maybe rename in that case. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); Review Comment: You could consider using `membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state` instead of changing the visibility of prod code ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -428,9 +428,9 @@ private void updateSubscription(Collection assignedPartitions, * Transition to the {@link MemberState#JOINING} state, indicating that the member will * try to join the group on the next heartbeat request. This is expected to be invoked when * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. - * Visible for testing. Review Comment: I think it's still visible for testing only ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } +/** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ +public void ack() { +pollTimer.reset(rebalanceTimeoutMs); +} + +Timer pollTimer() { Review Comment: // visible for testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1837908142 hi @lianetm @lucasbru can I ask for a review for this PR? Modification was made based on our discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1832745572 In the current implementation, when poll timer expires, the consumer sends a leavegroup request w/o revoking the partitions. This is obviously a bit different to the current state transition, so I'm not sure what's the right thing to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1409899121 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (!isLeaveGroupInProgress() && pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +membershipManager.leaveGroup().whenComplete((ignored, exception) -> { Review Comment: What is the exact to do in KIP-848 if the poll timer expires? The current implementation sends a leavegroup request directly without revoking partitions. If we want to do that, we need to modify the state transition a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (!isLeaveGroupInProgress() && pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +membershipManager.leaveGroup().whenComplete((ignored, exception) -> { Review Comment: Not sure if kip-848 changes this but ``` // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, // and the membership expiration is only controlled by session timeout. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } +pollTimer.update(currentTimeMs); +if (!isLeaveGroupInProgress() && pollTimer.isExpired()) { +logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + +"was longer than the configured max.poll.interval.ms, which typically implies that " + +"the poll loop is spending too much time processing messages. You can address this " + +"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + +"returned in poll() with max.poll.records."); +membershipManager.leaveGroup().whenComplete((ignored, exception) -> { Review Comment: Not sure if kip-848 changes this but ``` // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, // and the membership expiration is only controlled by session timeout. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee opened a new pull request, #14873: URL: https://github.com/apache/kafka/pull/14873 Currently, poll interval is not being respected during consumer#poll. We need to make sure the consumer can leave the group actively when the user doesn't poll frequently enough. This PR added 1. a poll timer and 2. a boolean flag notifying joining on subsequent poll. - The poll timer is always configured with rebalanceTimeout - The boolean flag, rejoinOnPoll, is set when the leaveGroup completes. Such as the subsequent poll and rejoin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org