Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-20 Thread via GitHub


cadonna merged PR #14532:
URL: https://github.com/apache/kafka/pull/14532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-20 Thread via GitHub


cadonna commented on PR #14532:
URL: https://github.com/apache/kafka/pull/14532#issuecomment-1772246862

   Build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-19 Thread via GitHub


philipnee commented on PR #14532:
URL: https://github.com/apache/kafka/pull/14532#issuecomment-1771337422

   Thank you @cadonna - I adapted your suggestion. Thanks again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-19 Thread via GitHub


cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1365517149


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -97,6 +99,29 @@ public void testTimeoutAfterSend() throws Exception {
 }
 }
 
+@Test
+public void testEnsureCorrectCompletionTimeOnFailure() {
+NetworkClientDelegate.UnsentRequest unsentRequest = 
newUnsentFindCoordinatorRequest();
+long timeMs = time.milliseconds();
+unsentRequest.handler().onFailure(timeMs, new TimeoutException());
+
+time.sleep(100);
+assertEquals(timeMs, unsentRequest.handler().completionTimeMs());
+}
+
+@Test
+public void testEnsureCorrectCompletionTimeOnComplete() throws IOException 
{
+NetworkClientDelegate.UnsentRequest unsentRequest = 
newUnsentFindCoordinatorRequest();
+prepareFindCoordinatorResponse(Errors.NONE);
+long timeMs = time.milliseconds();
+try (NetworkClientDelegate delegate = newNetworkClientDelegate()) {
+delegate.send(unsentRequest);
+delegate.poll(0, timeMs);
+}
+time.sleep(100);
+assertEquals(timeMs, unsentRequest.handler().completionTimeMs());
+}
+

Review Comment:
   If you use a mock for the client response, the test becomes simpler.
   ```suggestion
   @Test
   public void testEnsureCorrectCompletionTimeOnComplete() {
   NetworkClientDelegate.UnsentRequest unsentRequest = 
newUnsentFindCoordinatorRequest();
   long timeMs = time.milliseconds();
   final ClientResponse response = mock(ClientResponse.class);
   when(response.receivedTimeMs()).thenReturn(timeMs);
   
   unsentRequest.handler().onComplete(response);
   time.sleep(100);
   
   assertEquals(timeMs, unsentRequest.handler().completionTimeMs());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-19 Thread via GitHub


cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1365044416


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 new ArrayList<>(this.requestedPartitions),
 throwOnFetchStableOffsetUnsupported);
 return new NetworkClientDelegate.UnsentRequest(
-builder,
-coordinatorRequestManager.coordinator(),
-(r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
+builder,
+coordinatorRequestManager.coordinator(),
+(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) 
r.responseBody()));

Review Comment:
   Of course, mine was a question for my understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on PR #14532:
URL: https://github.com/apache/kafka/pull/14532#issuecomment-1769876947

   Hi @cadonna Thank you for spending time reviewing the PR again. I have 
answered your comments, and attached a followup ticket and a minor PR: 
https://github.com/apache/kafka/pull/14581
   
   The PR is based on this PR so the commit history looks a bit messy; once we 
merge this PR, I will rebase again.
   
   Let me know if you have more questions around this PR, love to address them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364845881


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());

Review Comment:
   Thanks
   `request.handler().completionTimeMs() instead of response.receivedTimeMs()` 
- This is addressed
   
   `UnsentRequest using request.handler() would not be possible` - Let me 
quickly follow up with a MINOR PR to address this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364842391


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
 }
 }
 
+@Test
+public void testNetworkTimeout() {

Review Comment:
   Not entirely sure about the behavior of the OffsetRequestManager - I'll need 
to get back to this.  Added a ticket there: KAFKA-15642



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364833452


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
 }
 }
 
+@Test
+public void testNetworkTimeout() {

Review Comment:
   CoordinatorRequestManager doesn't have it - so I added the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364830057


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
 }
 }
 
+@Test
+public void testNetworkTimeout() {

Review Comment:
   Per previous comment: NetworkTimeout isn't handled correctly in the 
commitRequestManager, therefore I want to address this in that PR.  I will 
check other request manager to ensure this is tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364829502


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 new ArrayList<>(this.requestedPartitions),
 throwOnFetchStableOffsetUnsupported);
 return new NetworkClientDelegate.UnsentRequest(
-builder,
-coordinatorRequestManager.coordinator(),
-(r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
+builder,
+coordinatorRequestManager.coordinator(),
+(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) 
r.responseBody()));

Review Comment:
   In fact, this is a bug and I'm planning to address this in 
[KAFKA-15562](https://issues.apache.org/jira/browse/KAFKA-15562).  Can we 
address the gap all in a follow up PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364820542


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -256,27 +256,39 @@ public String toString() {
 
 public static class FutureCompletionHandler implements 
RequestCompletionHandler {
 
+private long responseCompletionTimeMs;
 private final CompletableFuture future;
 
 FutureCompletionHandler() {
-this.future = new CompletableFuture<>();
+future = new CompletableFuture<>();
 }
 
-public void onFailure(final RuntimeException e) {
-future.completeExceptionally(e);
+public void onFailure(final long currentTimeMs, final RuntimeException 
e) {

Review Comment:
   You are right, I should add one there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1363336947


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());

Review Comment:
   nit:
   I was wondering whether you should use 
`request.handler().completionTimeMs()` instead of `response.receivedTimeMs()` 
here and in other places. IMO, it would make the calls clearer because 
`request.handler()` would be the single component that manages the completion 
time.
   I also realized that if the completion handler is passed into the 
constructor of `UnsentRequest` using  `request.handler()` would not be 
possible. I was also thinking that maybe you should get rid of that constructor 
because it makes the code less readable. 
   Those are just nit comments to think about. They do not block merging this 
PR.   



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -256,27 +256,39 @@ public String toString() {
 
 public static class FutureCompletionHandler implements 
RequestCompletionHandler {
 
+private long responseCompletionTimeMs;
 private final CompletableFuture future;
 
 FutureCompletionHandler() {
-this.future = new CompletableFuture<>();
+future = new CompletableFuture<>();
 }
 
-public void onFailure(final RuntimeException e) {
-future.completeExceptionally(e);
+public void onFailure(final long currentTimeMs, final RuntimeException 
e) {

Review Comment:
   I could not find unit tests to verify the setting of the correct completion 
time in `onFailure()` and `onComplete()`.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
 }
 }
 
+@Test
+public void testNetworkTimeout() {

Review Comment:
   I expected such a test also in the other request manager tests. The only 
request manager test that has such test other than this is 
`HeartbeatRequestManager`. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -148,30 +148,33 @@ private Optional 
send(final long currentTim
 
 private NetworkClientDelegate.UnsentRequest createUnsentRequest(
 final MetadataRequest.Builder request) {
-return new NetworkClientDelegate.UnsentRequest(
-request,
-Optional.empty(),
-this::processResponseOrException
-);
-}
+NetworkClientDelegate.UnsentRequest unsent = new 
NetworkClientDelegate.UnsentRequest(
+request,
+Optional.empty());
+
+unsent.future().whenComplete((response, exception) -> {
+if (response == null) {
+// Backoff if the error is retriable

Review Comment:
   I think this is not the right place for this comment because there is no 
concept of a backoff here. I think you should remove it.
   
   (Maybe you realised by now that I am not a big fan of inline comments. I 
think they eventually start to lie. In this case it is even very likely because 
you cannot see the code to which this comment refers. If something changes in 
`handleError()`, it is very unlikely that this comment will be updated.)   



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 new ArrayList<>(this.requestedPartitions),
 throwOnFetchStableOffsetUnsupported);
 return new NetworkClientDelegate.UnsentRequest(
-builder,
-coordinatorRequestManager.coordinator(),
-(r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
+builder,
+coordinatorRequestManager.coordinator(),
+(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) 
r.responseBody()));

Review Comment:
   Out of curiosity, can the response never be `null` here as in 
`HeartbeatRequestManager`? Can timeouts not happen here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-17 Thread via GitHub


cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1363231692


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 new ConsumerGroupHeartbeatRequest.Builder(data),
 coordinatorRequestManager.coordinator());
-request.future().whenComplete((response, exception) -> {
+request.handler().whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
 } else {
-// TODO: Currently, we lack a good way to propage the response 
time from the network client to the
-//  request handler. We will need to store the response time 
in the handler to make it accessible.
-onFailure(exception, time.milliseconds());
+onFailure(exception, request.handler().completionTimeMs());

Review Comment:
   OK, this was a misunderstanding on my side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-16 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1361282001


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 new ConsumerGroupHeartbeatRequest.Builder(data),
 coordinatorRequestManager.coordinator());
-request.future().whenComplete((response, exception) -> {
+request.handler().whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
 } else {
-// TODO: Currently, we lack a good way to propage the response 
time from the network client to the
-//  request handler. We will need to store the response time 
in the handler to make it accessible.
-onFailure(exception, time.milliseconds());
+onFailure(exception, request.handler().completionTimeMs());

Review Comment:
   The future may be completed exceptionally with an exception, or get 
completed with a response and an error code.  Is this where the confusion comes 
from? As retriable exception can be thrown in both response as well as the 
throwable part. 
   
   Should I make onComplete to accept both the error and the response? like 
void onComplete(ClientResponse response, Throwable error); to be more 
consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-16 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1361282001


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 new ConsumerGroupHeartbeatRequest.Builder(data),
 coordinatorRequestManager.coordinator());
-request.future().whenComplete((response, exception) -> {
+request.handler().whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
 } else {
-// TODO: Currently, we lack a good way to propage the response 
time from the network client to the
-//  request handler. We will need to store the response time 
in the handler to make it accessible.
-onFailure(exception, time.milliseconds());
+onFailure(exception, request.handler().completionTimeMs());

Review Comment:
   The future may be completed exceptionally with an exception, or get 
completed with a response and an error code.  Is this where the confusion comes 
from? As retriable exception can be thrown in both response as well as the 
throwable part. 
   
   Should I make onComplete to accept both the error and the response? like 
void onComplete(ClientResponse response, Throwable error); to be more 
consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-16 Thread via GitHub


cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1360369521


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -254,28 +250,34 @@ public String toString() {
 }
 }
 
-public static class FutureCompletionHandler implements 
RequestCompletionHandler {
-
-private final CompletableFuture future;
+public static class FutureCompletionHandler extends 
CompletableFuture implements RequestCompletionHandler {

Review Comment:
   Why do you want to inherit from `CompletableFuture`? Extending base classes 
might lead to the fragile base class problem. I do not see that your code 
manifests this problem at the moment, but I think it would be better to 
minimize the surface for such problem if possible and if it does not cost too 
much.  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 new ConsumerGroupHeartbeatRequest.Builder(data),
 coordinatorRequestManager.coordinator());
-request.future().whenComplete((response, exception) -> {
+request.handler().whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
 } else {
-// TODO: Currently, we lack a good way to propage the response 
time from the network client to the
-//  request handler. We will need to store the response time 
in the handler to make it accessible.
-onFailure(exception, time.milliseconds());
+onFailure(exception, request.handler().completionTimeMs());

Review Comment:
   I am not sure, I understand. If `onComplete()` is never called with response 
== `null` why is there a branch in the completion handler for that case that 
calls `onFailure()`? Wouldn't it be better to throw an `IllegalStateException` 
or similar? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-16 Thread via GitHub


cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1360302255


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs,
 
 private void retry(final long currentTimeMs) {
 onFailedAttempt(currentTimeMs);
-onSendAttempt(currentTimeMs);

Review Comment:
   Ah, yeah that makes sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on PR #14532:
URL: https://github.com/apache/kafka/pull/14532#issuecomment-1761979623

   Hi @cadonna - Thank you for putting time into this PR, very much 
appreciated. I responded to some of your questions, let me know if there is 
still any ambiguity left.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1358656536


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 new ConsumerGroupHeartbeatRequest.Builder(data),
 coordinatorRequestManager.coordinator());
-request.future().whenComplete((response, exception) -> {
+request.handler().whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
 } else {
-// TODO: Currently, we lack a good way to propage the response 
time from the network client to the
-//  request handler. We will need to store the response time 
in the handler to make it accessible.
-onFailure(exception, time.milliseconds());
+onFailure(exception, request.handler().completionTimeMs());

Review Comment:
   I think the only time we invoke onComplete() is when a response is available 
(see below). If the response is `null`, which indicates the request has not 
been sent out; therefore, in the `NetworkClientDelegate`, we need to actively 
fail the request on timeout.
   
   onComplete when response is available:
   ```
   private void completeResponses(List responses) {
   for (ClientResponse response : responses) {
   try {
   response.onComplete();
   } catch (Exception e) {
   log.error("Uncaught error in request completion:", e);
   }
   }
   }
   ```
   
   We actively expires the unsent request and fail them with TimeoutException:
   ```
   if (unsent.timer.isExpired()) {
   iterator.remove();
   unsent.handler.onFailure(currentTimeMs, new TimeoutException(
   "Failed to send request after " + 
unsent.timer.timeoutMs() + " ms."));
   continue;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1358640330


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -254,28 +250,38 @@ public String toString() {
 }
 }
 
-public static class FutureCompletionHandler implements 
RequestCompletionHandler {
+public static class FutureCompletionHandler extends 
CompletableFuture implements RequestCompletionHandler {
 
-private final CompletableFuture future;
+/**
+ * The time when the response is completed. This is used when the 
response is completed exceptionally because
+ * ClientResponse already contains received time which is injected by 
the network client.
+ */

Review Comment:
   Thanks, I think I was trying to say the main use of responseCompletionTimeMs 
is mainly used when the future gets completed exceptionally.  For the 
successful requests, we could use either the ClientResponse or this time.  I'll 
remove this comment to avoid the ambiguity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1358637014


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs,
 
 private void retry(final long currentTimeMs) {
 onFailedAttempt(currentTimeMs);
-onSendAttempt(currentTimeMs);

Review Comment:
   This is duplicated because the manager only sends out the request when it is 
being polled via the `poll()` method, which `drain()` the unsent requests from 
the queue.  In the `drain()` it already invokes the `onSentAttempt()`
   
   See
   ```
   for (OffsetFetchRequestState request : partitionedBySendability.get(true)) {
   request.onSendAttempt(currentTimeMs);
   unsentRequests.add(request.toUnsentRequest());
   inflightOffsetFetches.add(request);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-13 Thread via GitHub


cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1356928191


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs,
 
 private void retry(final long currentTimeMs) {
 onFailedAttempt(currentTimeMs);
-onSendAttempt(currentTimeMs);

Review Comment:
   Why is this call removed?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -254,28 +250,38 @@ public String toString() {
 }
 }
 
-public static class FutureCompletionHandler implements 
RequestCompletionHandler {
+public static class FutureCompletionHandler extends 
CompletableFuture implements RequestCompletionHandler {
 
-private final CompletableFuture future;
+/**
+ * The time when the response is completed. This is used when the 
response is completed exceptionally because
+ * ClientResponse already contains received time which is injected by 
the network client.
+ */

Review Comment:
   I do not understand this comment. Variable `responseCompletionTimeMs` is 
also set when the request is completed succesfully. I actually think, we do not 
need this comment.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 new ConsumerGroupHeartbeatRequest.Builder(data),
 coordinatorRequestManager.coordinator());
-request.future().whenComplete((response, exception) -> {
+request.handler().whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());
 } else {
-// TODO: Currently, we lack a good way to propage the response 
time from the network client to the
-//  request handler. We will need to store the response time 
in the handler to make it accessible.
-onFailure(exception, time.milliseconds());
+onFailure(exception, request.handler().completionTimeMs());

Review Comment:
   I do not completely understand how this should work. If the response is 
`null` then I assume also `onComplete()` of `NetworkClientDelegate` is not 
called because that would lead to a `NullPointerException`. If `onComplete()` 
of `NetworkClientDelegate` is not called the `completionTimeMs` field in 
`NetworkClientDelegate` is not set. Thus, 
`request.handler().completionTimeMs()` will not return the completion time.
   Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-11 Thread via GitHub


philipnee commented on PR #14532:
URL: https://github.com/apache/kafka/pull/14532#issuecomment-1758856125

   @cadonna @mjsax @lucasbru - Any one of you would be interested in reviewing 
this PR? I think this is a fairly straightforward patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-11 Thread via GitHub


philipnee opened a new pull request, #14532:
URL: https://github.com/apache/kafka/pull/14532

   Several places in the code, we relies on making a system time call to get 
the response time.  This is not ideal because these system calls can add up.  
Instead, time is already retrieved on the top of the background thread event 
loop, which is then propagate into the NetworkClientDelegate.poll.
   
   We want to use that timestamp when we need to fail a request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org