Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna merged PR #15698: URL: https://github.com/apache/kafka/pull/15698 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2077313831 Thanks for the helpful comments @cadonna , all addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1579555839 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -469,19 +469,33 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } +/** + * Check if a heartbeat request should be sent on the current time. A heartbeat should be + * sent if the heartbeat timer has expired, backoff has expired, and there is no request + * in-flight. + */ @Override public boolean canSendRequest(final long currentTimeMs) { update(currentTimeMs); return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs); } -public long nextHeartbeatMs(final long currentTimeMs) { +public long timeToNextHeartbeatMs(final long currentTimeMs) { if (heartbeatTimer.remainingMs() == 0) { Review Comment: They achieve the same here, and totally agree that `isExpired` is more readable, fixed. (Sensible "since we're here..." to me too btw) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1579555219 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } +@Test +public void testSuccessfulHeartbeatTiming() { +mockStableMember(); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), +"No heartbeat should be sent while interval has not expired"); +long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); +inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: This relates to your comment above. You're right that we did not have the check to ensure the reset happened on the sent and not on the response, so I added above the check for the `timeToNextHeartbeatMs` that would fail if the timer is not reset on the send, with a specific message for it. That check covers it, but still I also added the steps for advance the timer just a bit, check that no HB is sent and that the time is updated with the difference, as you suggested. All done. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } +@Test +public void testSuccessfulHeartbeatTiming() { +mockStableMember(); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), +"No heartbeat should be sent while interval has not expired"); +long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); Review Comment: Sure! I missed it. Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } +@Test +public void testSuccessfulHeartbeatTiming() { +mockStableMember(); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), +"No heartbeat should be sent while interval has not expired"); +long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); +inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: How do you know whether the heartbeat timer was reset in `makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you complete the future of the request? I would verify the timer reset after the poll, progress the time a bit (less then the heartbeat interval), and then verify here that the time to the next heartbeat is the heartbeat interval minus the amount of time I progressed the time after the poll. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } +@Test +public void testSuccessfulHeartbeatTiming() { +mockStableMember(); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), +"No heartbeat should be sent while interval has not expired"); +long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); +inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: How do you know whether the heartbeat timer was reset in `makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you complete the future of the request? I would verify the reset after the poll, progress the time a bit (less then the heartbeat interval), and then verify here that the time did not change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } +@Test +public void testSuccessfulHeartbeatTiming() { +mockStableMember(); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), +"No heartbeat should be sent while interval has not expired"); +long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); +inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: How do you know whether the heartbeat timer was reset in `makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you complete the future of the request? I would verify the reset after the poll, progress the time a bit (less then the heartbeat interval), and then verify here that the time to the next heartbeat did not change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1579187392 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } +@Test +public void testSuccessfulHeartbeatTiming() { +mockStableMember(); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), +"No heartbeat should be sent while interval has not expired"); +long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); Review Comment: Don't you need a verification here that ensures that the heartbeat timer was reset after the poll? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } +@Test +public void testSuccessfulHeartbeatTiming() { +mockStableMember(); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), +"No heartbeat should be sent while interval has not expired"); +long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); +inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); +assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: How do you know whether the heartbeat timer was reset in `makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you complete the future of the request? I would verify the reset after the poll, progress the time a bit (less then the heartbeat interval), and then verify here that the time did not change after the progress. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -469,19 +469,33 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } +/** + * Check if a heartbeat request should be sent on the current time. A heartbeat should be + * sent if the heartbeat timer has expired, backoff has expired, and there is no request + * in-flight. + */ @Override public boolean canSendRequest(final long currentTimeMs) { update(currentTimeMs); return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs); } -public long nextHeartbeatMs(final long currentTimeMs) { +public long timeToNextHeartbeatMs(final long currentTimeMs) { if (heartbeatTimer.remainingMs() == 0) { Review Comment: Sorry if I comment on code outside the PR. Isn't this the same as `heartbeatTimer.isExpired()`? If yes, could we please change this to make the the code more readable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2069725394 Hi @cadonna, thanks for the comments! - the unit test I added initially fails on [this](https://github.com/apache/kafka/blob/d242c444562fe4be41a2bc53d47ab2a6f523b955/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L276) assertion without the changes on this PR, I guess that's what you were looking for with your first comment? - agreed, we did not have test coverage to make sure that the `canSendRequest` and `nextHeartbeat` move along consistently. I added a func to validate them together, included it in the existing test that covers responses with errors, and added a new test to include it in the successful path too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1574708196 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Expire timer to allow sending HB after a failure. After a failure, a next HB may be +// needed with backoff (ex. errors that lead to retries, like coordinator load error), +// or immediately (ex. errors that lead to rejoining, like fencing errors). +heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs()); Review Comment: Sorry, my previous comment is not correct. The `heartbeatTimer` is expired when you call `heartbeatTimer.reset(0)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1574660830 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Expire timer to allow sending HB after a failure. After a failure, a next HB may be +// needed with backoff (ex. errors that lead to retries, like coordinator load error), +// or immediately (ex. errors that lead to rejoining, like fencing errors). +heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs()); Review Comment: Your change to `heartbeatTimer.reset(0)` is not totally equivalent to `heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs());` because in the former the `heartbeatTimer` is not expired until `heartbeatTimer.update()` is called, whereas in the latter the heartbeat is expired after the call, but I think in this specific case it does not matter. Is my assumption correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2064258399 Hey @cadonna, could you take a look when you have chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2062253184 Thanks for the comments @kirktrue, all addressed. Regarding your comment regarding tests [here](https://github.com/apache/kafka/pull/15698#pullrequestreview-2004676007), we have that covered with the existing `testHeartbeatResponseOnErrorHandling`. That one is validating that we get the right `nextHeartbeatMs` time (which considers the timer), for each specific error type. Is that what you were looking for? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1569511540 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,35 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOneInFlight() { +mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + +// Heartbeat sent (no response received) +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + +"previous on in-flight"); Review Comment: Fixed (not nit-picky at all, ugly typo, good catch!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1569504137 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Reset timer to allow sending HB after a failure without waiting for the interval. +// After a failure, a next HB may be needed with backoff (ex. errors that lead to +// retries, like coordinator load error), or immediately (ex. errors that lead to +// rejoining, like fencing errors). +heartbeatTimer.reset(0); +super.onFailedAttempt(currentTimeMs); Review Comment: no, it doesn't. The timer is only to indicate that an interval should be respected. In cases of failures, we don't want to follow the interval (so we reset timer to 0). Each error will : - send a next HB based on other conditions (ex. as soon as the coordinator is discovered, when releasing assignment finishes after getting fenced) - not send a next HB at all (fatal errors) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1569498394 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -380,7 +380,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case UNRELEASED_INSTANCE_ID: -logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}", +logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", Review Comment: both. The change for the default log is needed: it was not including the errorMessage, and that makes it hard to know what happened when you get errors like INVALID_REQUEST (I personally got it and lost time investigating, so fixed it). The other log changes are just improvements because I was already there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
kirktrue commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1567992301 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Reset timer to allow sending HB after a failure without waiting for the interval. +// After a failure, a next HB may be needed with backoff (ex. errors that lead to +// retries, like coordinator load error), or immediately (ex. errors that lead to +// rejoining, like fencing errors). +heartbeatTimer.reset(0); +super.onFailedAttempt(currentTimeMs); Review Comment: Does the decision to reset the heartbeat timer depend on what _type_ of error is received? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -380,7 +380,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case UNRELEASED_INSTANCE_ID: -logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}", +logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", Review Comment: QQ: are these logging changes of the ‘I'll just clean this up as long as I'm in here?’ variety, or dow it have some bearing on the correctness of the logs? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,35 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOneInFlight() { +mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + +// Heartbeat sent (no response received) +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + +"previous on in-flight"); Review Comment: Super nit-picky, sorry ```suggestion "previous one in-flight"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr heartbeatRequestState.onSendAttempt(currentTimeMs); membershipManager.onHeartbeatRequestSent(); metricsManager.recordHeartbeatSentMs(currentTimeMs); +// Reset timer when sending the request, to make sure that, if waiting for the interval, +// we don't include the response time (which may introduce delay) Review Comment: You're right, I don't think it's bringing anything not clear with the func name and action themselves. Removed. This is covered in the new test I added [here](https://github.com/apache/kafka/blob/fe483ff816b62133291f77f29b00e3bc706b581f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L258). I just included now an assert message along the lines of this comment to make it clearer in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2056883089 Hey @cadonna, thanks a lot for your feedback! All comments addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1565815246 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Expire timer to allow sending HB after a failure. After a failure, a next HB may be +// needed with backoff (ex. errors that lead to retries, like coordinator load error), +// or immediately (ex. errors that lead to rejoining, like fencing errors). +heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs()); Review Comment: Done, good point. I changed it to reset to 0, that shows the intention of not having an interval to wait for, which is what we want on these failure scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr heartbeatRequestState.onSendAttempt(currentTimeMs); membershipManager.onHeartbeatRequestSent(); metricsManager.recordHeartbeatSentMs(currentTimeMs); +// Reset timer when sending the request, to make sure that, if waiting for the interval, +// we don't include the response time (which may introduce delay) Review Comment: You're right, I don't think it's bringing anything not clear with the func name and action themselves. Removed. This is covered in the new test I added. I just included now an assert message along the lines of this comment to make it clearer in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1562500382 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr heartbeatRequestState.onSendAttempt(currentTimeMs); membershipManager.onHeartbeatRequestSent(); metricsManager.recordHeartbeatSentMs(currentTimeMs); +// Reset timer when sending the request, to make sure that, if waiting for the interval, +// we don't include the response time (which may introduce delay) Review Comment: Do we really need this comment? Additionally, I could not find a verification of this call in unit tests. Since you added a comment it seems to be important enough for a verification. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,34 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOnInFlight() { +mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + +// Heartbeat sent (no response received) +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + +"previous on in-flight"); + +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " + +"interval expires if there is a previous HB request in-flight"); + +// Receive response for the inflight. The next HB should be sent on the next poll after +// the interval expires. +inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); +time.sleep(DEFAULT_RETRY_BACKOFF_MS); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); + Review Comment: ```suggestion ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,34 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOnInFlight() { Review Comment: typo ```suggestion public void testHeartbeatNotSentIfAnotherOneInFlight() { ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Expire timer to allow sending HB after a failure. After a failure, a next HB may be +// needed with backoff (ex. errors that lead to retries, like coordinator load error), +// or immediately (ex. errors that lead to rejoining, like fencing errors). +heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs()); Review Comment: What about adding a method to `Timer` that expires the timer without updating it with a time point in the future. Alternatively, I think you could reset the `Timer` to 0 with `heartbeatTimer.reset(0)`. Do we need a verification in the unit tests for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2050401151 Hey @cadonna , could you take a look at this small fix when you have a chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org