Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna merged PR #14532: URL: https://github.com/apache/kafka/pull/14532 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on PR #14532: URL: https://github.com/apache/kafka/pull/14532#issuecomment-1772246862 Build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on PR #14532: URL: https://github.com/apache/kafka/pull/14532#issuecomment-1771337422 Thank you @cadonna - I adapted your suggestion. Thanks again! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1365517149 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java: ## @@ -97,6 +99,29 @@ public void testTimeoutAfterSend() throws Exception { } } +@Test +public void testEnsureCorrectCompletionTimeOnFailure() { +NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); +long timeMs = time.milliseconds(); +unsentRequest.handler().onFailure(timeMs, new TimeoutException()); + +time.sleep(100); +assertEquals(timeMs, unsentRequest.handler().completionTimeMs()); +} + +@Test +public void testEnsureCorrectCompletionTimeOnComplete() throws IOException { +NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); +prepareFindCoordinatorResponse(Errors.NONE); +long timeMs = time.milliseconds(); +try (NetworkClientDelegate delegate = newNetworkClientDelegate()) { +delegate.send(unsentRequest); +delegate.poll(0, timeMs); +} +time.sleep(100); +assertEquals(timeMs, unsentRequest.handler().completionTimeMs()); +} + Review Comment: If you use a mock for the client response, the test becomes simpler. ```suggestion @Test public void testEnsureCorrectCompletionTimeOnComplete() { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); long timeMs = time.milliseconds(); final ClientResponse response = mock(ClientResponse.class); when(response.receivedTimeMs()).thenReturn(timeMs); unsentRequest.handler().onComplete(response); time.sleep(100); assertEquals(timeMs, unsentRequest.handler().completionTimeMs()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1365044416 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { new ArrayList<>(this.requestedPartitions), throwOnFetchStableOffsetUnsupported); return new NetworkClientDelegate.UnsentRequest( -builder, -coordinatorRequestManager.coordinator(), -(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); +builder, +coordinatorRequestManager.coordinator(), +(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); Review Comment: Of course, mine was a question for my understanding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on PR #14532: URL: https://github.com/apache/kafka/pull/14532#issuecomment-1769876947 Hi @cadonna Thank you for spending time reviewing the PR again. I have answered your comments, and attached a followup ticket and a minor PR: https://github.com/apache/kafka/pull/14581 The PR is based on this PR so the commit history looks a bit messy; once we merge this PR, I will rebase again. Let me know if you have more questions around this PR, love to address them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364845881 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); Review Comment: Thanks `request.handler().completionTimeMs() instead of response.receivedTimeMs()` - This is addressed `UnsentRequest using request.handler() would not be possible` - Let me quickly follow up with a MINOR PR to address this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364842391 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } +@Test +public void testNetworkTimeout() { Review Comment: Not entirely sure about the behavior of the OffsetRequestManager - I'll need to get back to this. Added a ticket there: KAFKA-15642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364833452 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } +@Test +public void testNetworkTimeout() { Review Comment: CoordinatorRequestManager doesn't have it - so I added the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364830057 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } +@Test +public void testNetworkTimeout() { Review Comment: Per previous comment: NetworkTimeout isn't handled correctly in the commitRequestManager, therefore I want to address this in that PR. I will check other request manager to ensure this is tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364829502 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { new ArrayList<>(this.requestedPartitions), throwOnFetchStableOffsetUnsupported); return new NetworkClientDelegate.UnsentRequest( -builder, -coordinatorRequestManager.coordinator(), -(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); +builder, +coordinatorRequestManager.coordinator(), +(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); Review Comment: In fact, this is a bug and I'm planning to address this in [KAFKA-15562](https://issues.apache.org/jira/browse/KAFKA-15562). Can we address the gap all in a follow up PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1364820542 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -256,27 +256,39 @@ public String toString() { public static class FutureCompletionHandler implements RequestCompletionHandler { +private long responseCompletionTimeMs; private final CompletableFuture future; FutureCompletionHandler() { -this.future = new CompletableFuture<>(); +future = new CompletableFuture<>(); } -public void onFailure(final RuntimeException e) { -future.completeExceptionally(e); +public void onFailure(final long currentTimeMs, final RuntimeException e) { Review Comment: You are right, I should add one there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1363336947 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); Review Comment: nit: I was wondering whether you should use `request.handler().completionTimeMs()` instead of `response.receivedTimeMs()` here and in other places. IMO, it would make the calls clearer because `request.handler()` would be the single component that manages the completion time. I also realized that if the completion handler is passed into the constructor of `UnsentRequest` using `request.handler()` would not be possible. I was also thinking that maybe you should get rid of that constructor because it makes the code less readable. Those are just nit comments to think about. They do not block merging this PR. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -256,27 +256,39 @@ public String toString() { public static class FutureCompletionHandler implements RequestCompletionHandler { +private long responseCompletionTimeMs; private final CompletableFuture future; FutureCompletionHandler() { -this.future = new CompletableFuture<>(); +future = new CompletableFuture<>(); } -public void onFailure(final RuntimeException e) { -future.completeExceptionally(e); +public void onFailure(final long currentTimeMs, final RuntimeException e) { Review Comment: I could not find unit tests to verify the setting of the correct completion time in `onFailure()` and `onComplete()`. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } +@Test +public void testNetworkTimeout() { Review Comment: I expected such a test also in the other request manager tests. The only request manager test that has such test other than this is `HeartbeatRequestManager`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -148,30 +148,33 @@ private Optional send(final long currentTim private NetworkClientDelegate.UnsentRequest createUnsentRequest( final MetadataRequest.Builder request) { -return new NetworkClientDelegate.UnsentRequest( -request, -Optional.empty(), -this::processResponseOrException -); -} +NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( +request, +Optional.empty()); + +unsent.future().whenComplete((response, exception) -> { +if (response == null) { +// Backoff if the error is retriable Review Comment: I think this is not the right place for this comment because there is no concept of a backoff here. I think you should remove it. (Maybe you realised by now that I am not a big fan of inline comments. I think they eventually start to lie. In this case it is even very likely because you cannot see the code to which this comment refers. If something changes in `handleError()`, it is very unlikely that this comment will be updated.) ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { new ArrayList<>(this.requestedPartitions), throwOnFetchStableOffsetUnsupported); return new NetworkClientDelegate.UnsentRequest( -builder, -coordinatorRequestManager.coordinator(), -(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); +builder, +coordinatorRequestManager.coordinator(), +(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); Review Comment: Out of curiosity, can the response never be `null` here as in `HeartbeatRequestManager`? Can timeouts not happen here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1363231692 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); -request.future().whenComplete((response, exception) -> { +request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { -// TODO: Currently, we lack a good way to propage the response time from the network client to the -// request handler. We will need to store the response time in the handler to make it accessible. -onFailure(exception, time.milliseconds()); +onFailure(exception, request.handler().completionTimeMs()); Review Comment: OK, this was a misunderstanding on my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1361282001 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); -request.future().whenComplete((response, exception) -> { +request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { -// TODO: Currently, we lack a good way to propage the response time from the network client to the -// request handler. We will need to store the response time in the handler to make it accessible. -onFailure(exception, time.milliseconds()); +onFailure(exception, request.handler().completionTimeMs()); Review Comment: The future may be completed exceptionally with an exception, or get completed with a response and an error code. Is this where the confusion comes from? As retriable exception can be thrown in both response as well as the throwable part. Should I make onComplete to accept both the error and the response? like void onComplete(ClientResponse response, Throwable error); to be more consistent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1361282001 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); -request.future().whenComplete((response, exception) -> { +request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { -// TODO: Currently, we lack a good way to propage the response time from the network client to the -// request handler. We will need to store the response time in the handler to make it accessible. -onFailure(exception, time.milliseconds()); +onFailure(exception, request.handler().completionTimeMs()); Review Comment: The future may be completed exceptionally with an exception, or get completed with a response and an error code. Is this where the confusion comes from? As retriable exception can be thrown in both response as well as the throwable part. Should I make onComplete to accept both the error and the response? like void onComplete(ClientResponse response, Throwable error); to be more consistent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1360369521 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -254,28 +250,34 @@ public String toString() { } } -public static class FutureCompletionHandler implements RequestCompletionHandler { - -private final CompletableFuture future; +public static class FutureCompletionHandler extends CompletableFuture implements RequestCompletionHandler { Review Comment: Why do you want to inherit from `CompletableFuture`? Extending base classes might lead to the fragile base class problem. I do not see that your code manifests this problem at the moment, but I think it would be better to minimize the surface for such problem if possible and if it does not cost too much. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); -request.future().whenComplete((response, exception) -> { +request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { -// TODO: Currently, we lack a good way to propage the response time from the network client to the -// request handler. We will need to store the response time in the handler to make it accessible. -onFailure(exception, time.milliseconds()); +onFailure(exception, request.handler().completionTimeMs()); Review Comment: I am not sure, I understand. If `onComplete()` is never called with response == `null` why is there a branch in the completion handler for that case that calls `onFailure()`? Wouldn't it be better to throw an `IllegalStateException` or similar? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1360302255 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs, private void retry(final long currentTimeMs) { onFailedAttempt(currentTimeMs); -onSendAttempt(currentTimeMs); Review Comment: Ah, yeah that makes sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on PR #14532: URL: https://github.com/apache/kafka/pull/14532#issuecomment-1761979623 Hi @cadonna - Thank you for putting time into this PR, very much appreciated. I responded to some of your questions, let me know if there is still any ambiguity left. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1358656536 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); -request.future().whenComplete((response, exception) -> { +request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { -// TODO: Currently, we lack a good way to propage the response time from the network client to the -// request handler. We will need to store the response time in the handler to make it accessible. -onFailure(exception, time.milliseconds()); +onFailure(exception, request.handler().completionTimeMs()); Review Comment: I think the only time we invoke onComplete() is when a response is available (see below). If the response is `null`, which indicates the request has not been sent out; therefore, in the `NetworkClientDelegate`, we need to actively fail the request on timeout. onComplete when response is available: ``` private void completeResponses(List responses) { for (ClientResponse response : responses) { try { response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } } ``` We actively expires the unsent request and fail them with TimeoutException: ``` if (unsent.timer.isExpired()) { iterator.remove(); unsent.handler.onFailure(currentTimeMs, new TimeoutException( "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); continue; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1358640330 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -254,28 +250,38 @@ public String toString() { } } -public static class FutureCompletionHandler implements RequestCompletionHandler { +public static class FutureCompletionHandler extends CompletableFuture implements RequestCompletionHandler { -private final CompletableFuture future; +/** + * The time when the response is completed. This is used when the response is completed exceptionally because + * ClientResponse already contains received time which is injected by the network client. + */ Review Comment: Thanks, I think I was trying to say the main use of responseCompletionTimeMs is mainly used when the future gets completed exceptionally. For the successful requests, we could use either the ClientResponse or this time. I'll remove this comment to avoid the ambiguity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1358637014 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs, private void retry(final long currentTimeMs) { onFailedAttempt(currentTimeMs); -onSendAttempt(currentTimeMs); Review Comment: This is duplicated because the manager only sends out the request when it is being polled via the `poll()` method, which `drain()` the unsent requests from the queue. In the `drain()` it already invokes the `onSentAttempt()` See ``` for (OffsetFetchRequestState request : partitionedBySendability.get(true)) { request.onSendAttempt(currentTimeMs); unsentRequests.add(request.toUnsentRequest()); inflightOffsetFetches.add(request); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1356928191 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs, private void retry(final long currentTimeMs) { onFailedAttempt(currentTimeMs); -onSendAttempt(currentTimeMs); Review Comment: Why is this call removed? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -254,28 +250,38 @@ public String toString() { } } -public static class FutureCompletionHandler implements RequestCompletionHandler { +public static class FutureCompletionHandler extends CompletableFuture implements RequestCompletionHandler { -private final CompletableFuture future; +/** + * The time when the response is completed. This is used when the response is completed exceptionally because + * ClientResponse already contains received time which is injected by the network client. + */ Review Comment: I do not understand this comment. Variable `responseCompletionTimeMs` is also set when the request is completed succesfully. I actually think, we do not need this comment. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); -request.future().whenComplete((response, exception) -> { +request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { -// TODO: Currently, we lack a good way to propage the response time from the network client to the -// request handler. We will need to store the response time in the handler to make it accessible. -onFailure(exception, time.milliseconds()); +onFailure(exception, request.handler().completionTimeMs()); Review Comment: I do not completely understand how this should work. If the response is `null` then I assume also `onComplete()` of `NetworkClientDelegate` is not called because that would lead to a `NullPointerException`. If `onComplete()` of `NetworkClientDelegate` is not called the `completionTimeMs` field in `NetworkClientDelegate` is not set. Thus, `request.handler().completionTimeMs()` will not return the completion time. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee commented on PR #14532: URL: https://github.com/apache/kafka/pull/14532#issuecomment-1758856125 @cadonna @mjsax @lucasbru - Any one of you would be interested in reviewing this PR? I think this is a fairly straightforward patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15534: Inject request completion time when the request failed [kafka]
philipnee opened a new pull request, #14532: URL: https://github.com/apache/kafka/pull/14532 Several places in the code, we relies on making a system time call to get the response time. This is not ideal because these system calls can add up. Instead, time is already retrieved on the top of the background thread event loop, which is then propagate into the NetworkClientDelegate.poll. We want to use that timestamp when we need to fail a request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org