Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-07 Thread via GitHub


cadonna commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2154323060

   Merged to `trunk` and cherry-picked to `3.8` 🥳 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-07 Thread via GitHub


cadonna merged PR #16031:
URL: https://github.com/apache/kafka/pull/16031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628473245


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   If I add that to `offsetExceptionSupplier()` and 
`isRetriableOnOffsetFetch()`, the relevant tests fail 🤔
   
   That error is being tested in `testOffsetFetchRequestPartitionDataError()` 
via its supplier `partitionDataErrorSupplier()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628462153


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   those are right, but just missing `UnstableOffsetCommitException`, that is 
retriable and thrown on fetch 
[here](https://github.com/apache/kafka/blob/896af1b2f2f2a7d579e0aef074bcb2004c0246f2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1073).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628462153


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   those are right, but you're missing `UnstableOffsetCommitException`, that is 
retriable and thrown on fetch 
[here](https://github.com/apache/kafka/blob/896af1b2f2f2a7d579e0aef074bcb2004c0246f2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1073).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628457770


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   After removing the boolean flags from `offsetFetchExceptionSupplier()`, it 
became identical to `offsetCommitExceptionSupplier()`. So I refactored things a 
bit further and renamed `offsetCommitExceptionSupplier()` as 
`offsetExceptionSupplier()` for use by the relevant parameterized tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2150969566

   Fixed `TieredStorageTestContext` breakage. Sorry about that 🤦‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628442774


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   This is what I have that is needed to work:
   
   ```java
   private boolean isRetriableOnOffsetFetch(Errors error) {
   return error == Errors.NOT_COORDINATOR || error == 
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == 
Errors.COORDINATOR_NOT_AVAILABLE;
   }
   ```
   
   Does that seem correct, @lianetm?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628354409


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   yes, that makes sense to me. In the end it's a good point that we only have 
2 exceptions for retriables that are not considered retriable on the fetch 
path: unknown_topic and request timeout, so we could simply maybe have a helper 
isRetriableOnFetch that would return true if error.exception() instanceof 
RetriableException && !unknown_topic && !request_timeout, something like it. Up 
to you @kirktrue, but makes sense to me to get rid of the arg to maintain and 
be explicit as Bruno suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628332835


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   If the exceptions (not java exception, but exceptions from the rule) are 
just a few, wouldn't it be better for readability, to call them out explicitly 
here in the test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628332835


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   If the exceptions are just a few, wouldn't it be better for readability, to 
call them out explicitly here in the test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628330597


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   this one should be like this I would say, because this is the fetch (where 
we so have errors that from the exception definition are retriables, but we 
don't consider them retriable) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2150792103

   One failure with the TestUtils changes, breaking 
TieredStorageTestContext.java, could you pls check @kirktrue ? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628321190


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {

Review Comment:
   Do you also plan to use here `error.exception() instanceof 
RetriableException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628306229


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -960,7 +960,7 @@ void maybeReconcile() {
 // best effort to commit the offsets in the case where the epoch might 
have changed while
 // the current reconciliation is in process. Note this is using the 
rebalance timeout as
 // it is the limit enforced by the broker to complete the 
reconciliation process.
-commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   You can do it as a MINOR pull request, if you want, but it really is just a 
renaming of that variable and of the parameter in the constructor as you 
correctly stated in the ticket. Regarding your point in the ticket about there 
is more to it than changing the name, I cannot completely follow. When somebody 
instantiates a `MembershipManagerImpl` and passes the config 
`rebalance.timeout.ms` into the constructor for parameter 
`commitTimeoutDuringReconciliation` they state that they want to use the value 
of config `rebalance.timeout.ms` as `commitTimeoutDuringReconciliation`. This 
seems quite clear to me.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628279848


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   great, the outcome is what matters: we have an integration test now passing 
with the new consumer making progress on continuous poll ZERO, yay!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628208730


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   I'm not sure what I did wrong the first time. I was able to add my changes 
back and it works now for all combinations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628166108


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   `'s/currentTimeMs/completionTimeMs/`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628166108


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   `'s/currentTimeMs/completionTimeMs/'`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628164186


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -960,7 +960,7 @@ void maybeReconcile() {
 // best effort to commit the offsets in the case where the epoch might 
have changed while
 // the current reconciliation is in process. Note this is using the 
rebalance timeout as
 // it is the limit enforced by the broker to complete the 
reconciliation process.
-commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   Created KAFKA-16899 as a little bit of thought needs to be given to how the 
name change is handled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628148498


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -831,10 +823,9 @@ abstract class RetriableRequestState extends RequestState {
  * Complete the request future with a TimeoutException if the request 
timeout has been
  * reached, based on the provided current time.

Review Comment:
   Updated. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628147009


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {
+futures.forEach(f -> assertFalse(f.isDone()));
+
+// Insert a long enough sleep to force a timeout of the operation. 
Invoke poll() again so that each
+// OffsetFetchRequestState is evaluated via isExpired().
+time.sleep(defaultApiTimeoutMs);
+commitRequestManager.poll(time.milliseconds());
+futures.forEach(f -> assertFutureThrows(f, 
TimeoutException.class));
+} else {
+futures.forEach(f -> assertFutureThrows(f, KafkaException.class));
+assertEmptyPendingRequests(commitRequestManager);

Review Comment:
   Added checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628143167


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -1097,19 +1132,18 @@ private void testNonRetriable(final 
List offsetCommitExceptionSupplier() {
 return Stream.of(
-Arguments.of(Errors.NOT_COORDINATOR),
-Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS),
-Arguments.of(Errors.UNKNOWN_SERVER_ERROR),
-Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED),
-Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE),
-Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE),
-Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
-Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
-Arguments.of(Errors.REQUEST_TIMED_OUT),
-Arguments.of(Errors.FENCED_INSTANCE_ID),
-Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
-Arguments.of(Errors.STALE_MEMBER_EPOCH),
-Arguments.of(Errors.UNKNOWN_MEMBER_ID));
+Arguments.of(Errors.NOT_COORDINATOR, true),
+Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true),
+Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
+Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false),
+Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
+Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
+Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
+Arguments.of(Errors.REQUEST_TIMED_OUT, false),
+Arguments.of(Errors.FENCED_INSTANCE_ID, false),
+Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.UNKNOWN_MEMBER_ID, false));

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628142116


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState 
requestAttempt,
 result.complete(null);
 } else {
 if (error instanceof RetriableException) {
-if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {

Review Comment:
   I've updated the logic that checks `isExpired()`. PTAL at the latest 
version. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628140810


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -123,23 +122,6 @@ public void testEnsureCorrectCompletionTimeOnComplete() {
 assertEquals(timeMs, unsentRequest.handler().completionTimeMs());
 }
 
-@Test
-public void testEnsureTimerSetOnAdd() {
-NetworkClientDelegate ncd = newNetworkClientDelegate();
-NetworkClientDelegate.UnsentRequest findCoordRequest = 
newUnsentFindCoordinatorRequest();
-assertNull(findCoordRequest.timer());
-
-// NetworkClientDelegate#add
-ncd.add(findCoordRequest);
-assertEquals(1, ncd.unsentRequests().size());
-assertEquals(REQUEST_TIMEOUT_MS, 
ncd.unsentRequests().poll().timer().timeoutMs());
-
-// NetworkClientDelegate#addAll
-ncd.addAll(Collections.singletonList(findCoordRequest));
-assertEquals(1, ncd.unsentRequests().size());
-assertEquals(REQUEST_TIMEOUT_MS, 
ncd.unsentRequests().poll().timer().timeoutMs());
-}
-

Review Comment:
   This change was reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628139749


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   Thanks for the reminder to update the PR description, @lianetm! Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628082521


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   Sure!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628049433


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   I see from the commits that we reverted back to the original way, where the 
request timeout is defined at the networkclient level common for all requests, 
and passed onto the request on add, nice, let's just update the PR description 
then (we can remove point 4) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628023841


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -831,10 +823,9 @@ abstract class RetriableRequestState extends RequestState {
  * Complete the request future with a TimeoutException if the request 
timeout has been
  * reached, based on the provided current time.

Review Comment:
   Let's update this to include the "if the request has been sent out at least 
once and...(timeout expired)"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1628017722


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TimedRequestStateTest {
+
+private final static long DEFAULT_TIMEOUT_MS = 3;
+private final Time time = new MockTime();
+
+@Test
+public void testIsExpired() {
+TimedRequestState state = new TimedRequestState(
+new LogContext(),
+this.getClass().getSimpleName(),
+100,
+1000,
+time.timer(DEFAULT_TIMEOUT_MS)
+);
+assertFalse(state.isExpired());
+time.sleep(DEFAULT_TIMEOUT_MS);
+assertTrue(state.isExpired());
+}
+
+@Test
+public void testRemainingMs() {
+TimedRequestState state = new TimedRequestState(
+new LogContext(),
+this.getClass().getSimpleName(),
+100,
+1000,
+time.timer(DEFAULT_TIMEOUT_MS)
+);
+assertEquals(DEFAULT_TIMEOUT_MS, state.remainingMs());
+time.sleep(DEFAULT_TIMEOUT_MS);
+assertEquals(0, state.remainingMs());
+}
+
+@Test
+public void testDeadlineTimer() {
+long deadlineMs = time.milliseconds() + DEFAULT_TIMEOUT_MS;
+Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs);
+assertEquals(DEFAULT_TIMEOUT_MS, timer.remainingMs());
+timer.sleep(DEFAULT_TIMEOUT_MS);
+assertEquals(0, timer.remainingMs());
+}
+
+@Test
+public void testAllowOverdueDeadlineTimer() {
+long deadlineMs = time.milliseconds() - DEFAULT_TIMEOUT_MS;
+Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs);
+assertEquals(0, timer.remainingMs());
+}
+
+@Test
+public void testToStringUpdatesTimer() {
+TimedRequestState state = new TimedRequestState(
+new LogContext(),
+this.getClass().getSimpleName(),
+100,
+1000,
+time.timer(DEFAULT_TIMEOUT_MS)
+);
+
+assertToString(state, DEFAULT_TIMEOUT_MS);
+time.sleep(DEFAULT_TIMEOUT_MS);
+assertToString(state, 0);
+}
+
+private void assertToString(TimedRequestState state, long timerMs) {
+String[] toString = state.toString().split(", ");
+assertTrue(toString.length > 0);
+String expected = "remainingMs=" + timerMs + "}";
+String actual = toString[toString.length - 1];
+assertEquals(expected, actual);

Review Comment:
   what about simplifying all this with a single:
   ```suggestion
   assertTrue(state.toString().contains("remainingMs=" + timerMs + 
"}"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1627986247


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {
+futures.forEach(f -> assertFalse(f.isDone()));
+
+// Insert a long enough sleep to force a timeout of the operation. 
Invoke poll() again so that each
+// OffsetFetchRequestState is evaluated via isExpired().
+time.sleep(defaultApiTimeoutMs);
+commitRequestManager.poll(time.milliseconds());
+futures.forEach(f -> assertFutureThrows(f, 
TimeoutException.class));
+} else {
+futures.forEach(f -> assertFutureThrows(f, KafkaException.class));
+assertEmptyPendingRequests(commitRequestManager);

Review Comment:
   I see, still even if we leave the inflight, it would be helpful to assert 
that that we're not leaving any unsent after we expire them on ln 637/638. 
Something like this would be reassuring:
   ```
   
assertFalse(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
   commitRequestManager.poll(time.milliseconds());
   futures.forEach(f -> assertFutureThrows(f, 
TimeoutException.class));
   
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626764237


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   That is weird 🤔 I'll try to add it back in as part of the next round of 
feedback changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2148602850

   @lianetm @cadonna I believe I've handled the feedback (thanks!) and this is 
ready for another review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626761730


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -1097,19 +1132,18 @@ private void testNonRetriable(final 
List offsetCommitExceptionSupplier() {
 return Stream.of(
-Arguments.of(Errors.NOT_COORDINATOR),
-Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS),
-Arguments.of(Errors.UNKNOWN_SERVER_ERROR),
-Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED),
-Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE),
-Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE),
-Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
-Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
-Arguments.of(Errors.REQUEST_TIMED_OUT),
-Arguments.of(Errors.FENCED_INSTANCE_ID),
-Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
-Arguments.of(Errors.STALE_MEMBER_EPOCH),
-Arguments.of(Errors.UNKNOWN_MEMBER_ID));
+Arguments.of(Errors.NOT_COORDINATOR, true),
+Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true),
+Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
+Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false),
+Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
+Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
+Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
+Arguments.of(Errors.REQUEST_TIMED_OUT, false),
+Arguments.of(Errors.FENCED_INSTANCE_ID, false),
+Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.UNKNOWN_MEMBER_ID, false));

Review Comment:
   Nice catch. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626759185


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+private final Timer timer;
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+this.timer = timer;
+}
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final int retryBackoffExpBase,
+ final long retryBackoffMaxMs,
+ final double jitter,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+this.timer = timer;
+}
+
+public boolean isExpired() {
+timer.update();
+return timer.isExpired();
+}
+
+public long remainingMs() {
+timer.update();
+return timer.remainingMs();
+}
+
+public static Timer deadlineTimer(final Time time, final long deadlineMs) {
+long diff = Math.max(0, deadlineMs - time.milliseconds());
+return time.timer(diff);
+}
+
+
+@Override
+protected String toStringBase() {
+return super.toStringBase() + ", timer=" + remainingMs();

Review Comment:
   Updated to use `remainingMs`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -313,7 +306,7 @@ public String toString() {
 "requestBuilder=" + requestBuilder +
 ", handler=" + handler +
 ", node=" + node +
-", timer=" + timer +
+", timer=" + timer.remainingMs() +

Review Comment:
   Changed. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626759656


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -875,8 +872,12 @@ private void handleClientResponse(final ClientResponse 
response,
 
 abstract void onResponse(final ClientResponse response);
 
-boolean retryTimeoutExpired(long currentTimeMs) {
-return expirationTimeMs.isPresent() && expirationTimeMs.get() <= 
currentTimeMs;
+/**
+ * If at least one attempt has been sent, and the user-provided 
timeout has elapsed, consider the
+ * request as expired.
+ */
+boolean retryTimeoutExpired() {

Review Comment:
   ...and folding the checks into the caller directly.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -516,8 +527,9 @@ private void fetchOffsetsWithRetries(final 
OffsetFetchRequestState fetchRequest,
 result.complete(res);
 } else {
 if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-if (error instanceof TimeoutException && 
fetchRequest.isExpired) {
-result.completeExceptionally(error);
+if (fetchRequest.isExpired()) {
+log.debug("Fetch request for {} timed out and won't be 
retried anymore", fetchRequest.requestedPartitions);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626759390


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -875,8 +872,12 @@ private void handleClientResponse(final ClientResponse 
response,
 
 abstract void onResponse(final ClientResponse response);
 
-boolean retryTimeoutExpired(long currentTimeMs) {
-return expirationTimeMs.isPresent() && expirationTimeMs.get() <= 
currentTimeMs;
+/**
+ * If at least one attempt has been sent, and the user-provided 
timeout has elapsed, consider the
+ * request as expired.
+ */
+boolean retryTimeoutExpired() {

Review Comment:
   I solved it by removing the method altogether 😉 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626752933


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {
+futures.forEach(f -> assertFalse(f.isDone()));
+
+// Insert a long enough sleep to force a timeout of the operation. 
Invoke poll() again so that each
+// OffsetFetchRequestState is evaluated via isExpired().
+time.sleep(defaultApiTimeoutMs);
+commitRequestManager.poll(time.milliseconds());
+futures.forEach(f -> assertFutureThrows(f, 
TimeoutException.class));
+} else {
+futures.forEach(f -> assertFutureThrows(f, KafkaException.class));
+assertEmptyPendingRequests(commitRequestManager);

Review Comment:
   I wasn't removing the inflight requests since the code in the response 
handler removes it. If we did remove the inflight requests before the response 
was received, the user would see the warning about the request not being in the 
"buffer" when it did complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626748855


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -319,11 +326,11 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
 if (error == null) {
 result.complete(null);
 } else {
-if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {
-log.debug("Auto-commit sync before revocation timed 
out and won't be retried anymore");
-result.completeExceptionally(error);
-} else if (error instanceof 
UnknownTopicOrPartitionException) {
+if (requestAttempt.isExpired()) {

Review Comment:
   I reverted the code so that we only check `isExpired()` on 
`RetriableException`s and added unit tests to verify behavior. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2148416617

   Hey @kirktrue , I just completed another full pass, left some comments, 
thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626603047


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {
+futures.forEach(f -> assertFalse(f.isDone()));
+
+// Insert a long enough sleep to force a timeout of the operation. 
Invoke poll() again so that each
+// OffsetFetchRequestState is evaluated via isExpired().
+time.sleep(defaultApiTimeoutMs);
+commitRequestManager.poll(time.milliseconds());
+futures.forEach(f -> assertFutureThrows(f, 
TimeoutException.class));
+} else {
+futures.forEach(f -> assertFutureThrows(f, KafkaException.class));
+assertEmptyPendingRequests(commitRequestManager);

Review Comment:
   Actually this makes me wonder, are we removing the inflight fetch requests 
when we expire the request? I can only see them removed when we receive a 
response, but not when we intentionally expire the requests (where we remove 
only the unsent)   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626596195


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 }
 }
 
+@ParameterizedTest
+@MethodSource("offsetFetchExceptionSupplier")
+public void testOffsetFetchRequestTimeoutRequests(final Errors error,
+  final boolean 
isRetriable) {
+CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Set partitions = new HashSet<>();
+partitions.add(new TopicPartition("t1", 0));
+List>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+commitRequestManager,
+partitions,
+1,
+error);
+
+if (isRetriable) {
+futures.forEach(f -> assertFalse(f.isDone()));
+
+// Insert a long enough sleep to force a timeout of the operation. 
Invoke poll() again so that each
+// OffsetFetchRequestState is evaluated via isExpired().
+time.sleep(defaultApiTimeoutMs);
+commitRequestManager.poll(time.milliseconds());
+futures.forEach(f -> assertFutureThrows(f, 
TimeoutException.class));
+} else {
+futures.forEach(f -> assertFutureThrows(f, KafkaException.class));
+assertEmptyPendingRequests(commitRequestManager);

Review Comment:
   I guess we should be able to assert this no matter if retriable or not (just 
because in this test we're forcing a timeout on retriables). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -1097,19 +1132,18 @@ private void testNonRetriable(final 
List offsetCommitExceptionSupplier() {
 return Stream.of(
-Arguments.of(Errors.NOT_COORDINATOR),
-Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS),
-Arguments.of(Errors.UNKNOWN_SERVER_ERROR),
-Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED),
-Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE),
-Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE),
-Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
-Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
-Arguments.of(Errors.REQUEST_TIMED_OUT),
-Arguments.of(Errors.FENCED_INSTANCE_ID),
-Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
-Arguments.of(Errors.STALE_MEMBER_EPOCH),
-Arguments.of(Errors.UNKNOWN_MEMBER_ID));
+Arguments.of(Errors.NOT_COORDINATOR, true),
+Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true),
+Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
+Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false),
+Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
+Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
+Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
+Arguments.of(Errors.REQUEST_TIMED_OUT, false),
+Arguments.of(Errors.FENCED_INSTANCE_ID, false),
+Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.UNKNOWN_MEMBER_ID, false));

Review Comment:
   I would say we don't need to keep/maintain this new argument here to know if 
the exception is retriable. We can simply check `error.exception() instanceof 
RetriableException` when needed?? 
   
   For the record, in the case of the `offsetFetchExceptionSupplier` below, we 
do need the argument because there are retriable exceptions that we don't want 
to consider retriable when fetching offsets (Unknown_topic IIRC)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -1097,19 +1132,18 @@ private void testNonRetriable(final 
List offsetCommitExceptionSupplier() {
 return Stream.of(
-Arguments.of(Errors.NOT_COORDINATOR),
-Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS),
-Arguments.of(Errors.UNKNOWN_SERVER_ERROR),
-Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED),
-Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE),
-Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE),
-Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
-Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
-Arguments.of(Errors.REQUEST_TIMED_OUT),
-Arguments.of(Errors.FENCED_INSTANCE_ID),
-Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
-Arguments.of(Errors.STALE_MEMBER_EPOCH),
-Arguments.of(Errors.UNKNOWN_MEMBER_ID));
+Arguments.of(Errors.NOT_COORDINATOR, true),
+Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true),
+Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
+Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false),
+Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
+Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
+Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
+Arguments.of(Errors.REQUEST_TIMED_OUT, false),
+Arguments.of(Errors.FENCED_INSTANCE_ID, false),
+Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.UNKNOWN_MEMBER_ID, false));

Review Comment:
   I would say we don't need to keep/maintain this new argument here to know if 
the exception is retriable. We can simply check `error.exception() instanceof 
RetriableException` when needed?? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -1097,19 +1132,18 @@ private void testNonRetriable(final 
List offsetCommitExceptionSupplier() {
 return Stream.of(
-Arguments.of(Errors.NOT_COORDINATOR),
-Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS),
-Arguments.of(Errors.UNKNOWN_SERVER_ERROR),
-Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED),
-Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE),
-Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE),
-Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
-Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
-Arguments.of(Errors.REQUEST_TIMED_OUT),
-Arguments.of(Errors.FENCED_INSTANCE_ID),
-Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
-Arguments.of(Errors.STALE_MEMBER_EPOCH),
-Arguments.of(Errors.UNKNOWN_MEMBER_ID));
+Arguments.of(Errors.NOT_COORDINATOR, true),
+Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true),
+Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
+Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false),
+Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
+Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
+Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
+Arguments.of(Errors.REQUEST_TIMED_OUT, false),
+Arguments.of(Errors.FENCED_INSTANCE_ID, false),
+Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.UNKNOWN_MEMBER_ID, false));

Review Comment:
   I would say we don't need to keep/maintain this new argument here to know if 
the exception is retriable. We can simply check `error.exception() instanceof 
RetriableException` when needed. 
   
   For the record, in the case of the `offsetFetchExceptionSupplier` below, we 
do need the argument because there are retriable exceptions that we don't want 
to consider retriable when fetching offsets (Stale member epoch)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -1097,19 +1132,18 @@ private void testNonRetriable(final 
List offsetCommitExceptionSupplier() {
 return Stream.of(
-Arguments.of(Errors.NOT_COORDINATOR),
-Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS),
-Arguments.of(Errors.UNKNOWN_SERVER_ERROR),
-Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED),
-Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE),
-Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE),
-Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
-Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
-Arguments.of(Errors.REQUEST_TIMED_OUT),
-Arguments.of(Errors.FENCED_INSTANCE_ID),
-Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
-Arguments.of(Errors.STALE_MEMBER_EPOCH),
-Arguments.of(Errors.UNKNOWN_MEMBER_ID));
+Arguments.of(Errors.NOT_COORDINATOR, true),
+Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true),
+Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
+Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false),
+Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
+Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
+Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
+Arguments.of(Errors.REQUEST_TIMED_OUT, false),
+Arguments.of(Errors.FENCED_INSTANCE_ID, false),
+Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
+Arguments.of(Errors.UNKNOWN_MEMBER_ID, false));

Review Comment:
   I would say we don't need to keep/maintain this new argument here to know if 
the exception is retriable. We can simply check `error.exception() instanceof 
RetriableException` when needed. 
   
   For the record, in the case of the `offsetFetchExceptionSupplier` we do need 
the argument because there are retriable exceptions that we don't want to 
consider retriable when fetching offsets (Stale member epoch)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626550778


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+private final Timer timer;
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+this.timer = timer;
+}
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final int retryBackoffExpBase,
+ final long retryBackoffMaxMs,
+ final double jitter,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+this.timer = timer;
+}
+
+public boolean isExpired() {
+timer.update();
+return timer.isExpired();
+}
+
+public long remainingMs() {
+timer.update();
+return timer.remainingMs();
+}
+
+public static Timer deadlineTimer(final Time time, final long deadlineMs) {
+long diff = Math.max(0, deadlineMs - time.milliseconds());
+return time.timer(diff);
+}
+
+
+@Override
+protected String toStringBase() {
+return super.toStringBase() + ", timer=" + remainingMs();

Review Comment:
   same as for UnsentRequests toString, seeing "timer=100" will be confusing to 
understand that 100 is the remaining time in ms. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626544295


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -313,7 +306,7 @@ public String toString() {
 "requestBuilder=" + requestBuilder +
 ", handler=" + handler +
 ", node=" + node +
-", timer=" + timer +
+", timer=" + timer.remainingMs() +

Review Comment:
   ```suggestion
   ", remainingMs=" + timer.remainingMs() +
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626534240


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   uhm well there is a difference: before these changes, the timeout was 
defined at the `NetworkClientDelegate` level and not per request, which made 
sense I would say, since it's the same `REQUEST_TIMEOUT_MS_CONFIG` that needs 
to be applied to all UnsentRequests, so it was applied here on this ln 238 when 
any request was added. 
   
   Now we're saying that each UnsentRequest may have a diff timeout, but I see 
we end up with each manager taking that same REQUEST_TIMEOUT_MS_CONFIG config 
and passing it to each request. 
   
   Outcome is the same, but actually the original version of applying the 
REQUEST_TIMEOUT_MS_CONFIG to all requests in the network layer seemed simpler I 
would say (unless I'm missing a gain of this option of having the 
REQUEST_TIMEOUT_MS_CONFIG at the managers level). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626513292


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -875,8 +872,12 @@ private void handleClientResponse(final ClientResponse 
response,
 
 abstract void onResponse(final ClientResponse response);
 
-boolean retryTimeoutExpired(long currentTimeMs) {
-return expirationTimeMs.isPresent() && expirationTimeMs.get() <= 
currentTimeMs;
+/**
+ * If at least one attempt has been sent, and the user-provided 
timeout has elapsed, consider the
+ * request as expired.
+ */
+boolean retryTimeoutExpired() {

Review Comment:
   name nit: this is not only about timeout expired anymore, it considers if 
the request was sent (significant conceptual diff I would say), so what about 
aligning the name? (`sentAndExpired` maybe, or whatever you prefer to better 
reflect what it actually checks now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626501003


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -516,8 +527,9 @@ private void fetchOffsetsWithRetries(final 
OffsetFetchRequestState fetchRequest,
 result.complete(res);
 } else {
 if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-if (error instanceof TimeoutException && 
fetchRequest.isExpired) {
-result.completeExceptionally(error);
+if (fetchRequest.isExpired()) {
+log.debug("Fetch request for {} timed out and won't be 
retried anymore", fetchRequest.requestedPartitions);

Review Comment:
   nit: let's use "OffsetFetch request" to avoid confusion with Fetch requests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626479394


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   hey, I ran it locally again (I only change TestUtils ln 908 to poll with 
Duration.ZERO as a quick shortcut):
   
   - on trunk: test fails for new consumer only, passes for all other 
combinations
   - on this branch with latest changes: test passes for all combinations (yay!)
   
   Not sure what made you have it failing for all combinations but to me this 
PR seems to be fixing the issue, worth double checking to add the test if 
possible, it's a very good validation of the changes. (or ok to leave it out if 
you prefer, but good news that it seems to work here already)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626479394


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   hey, I ran it locally again (I only change TestUtils ln 908 to poll with 
Duration.ZERO as a quick shortcut):
   
   - on trunk: test fails for new consumer only, passes for all other 
combinations
   - on this branch with latest changes: test passes for all combinations (yay!)
   
   Not sure what made you have it failing for all combinations but to me this 
PR seems to be fixing the issue, worth double checking to add the test if 
possible, it's a very good validation of the changes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626395594


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   I've removed this for now, because the test won't pass until later PRs are 
merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626391190


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -960,7 +960,7 @@ void maybeReconcile() {
 // best effort to commit the offsets in the case where the epoch might 
have changed while
 // the current reconciliation is in process. Note this is using the 
rebalance timeout as
 // it is the limit enforced by the broker to complete the 
reconciliation process.
-commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   Can we handle this as a separate Jira or even a `[MINOR]` pull request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626390121


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   Can we handle this as a separate Jira or even a `[MINOR]` pull request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626375578


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {

Review Comment:
   Added `TimedRequestStateTest`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1280,7 +1280,7 @@ void prepareShutdown(final Timer timer, final 
AtomicReference firstEx
 if (autoCommitEnabled)
 autoCommitSync(timer);
 
-applicationEventHandler.add(new CommitOnCloseEvent());
+applicationEventHandler.add(new 
CommitOnCloseEvent(calculateDeadlineMs(timer)));

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626339539


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1280,7 +1280,7 @@ void prepareShutdown(final Timer timer, final 
AtomicReference firstEx
 if (autoCommitEnabled)
 autoCommitSync(timer);
 
-applicationEventHandler.add(new CommitOnCloseEvent());
+applicationEventHandler.add(new 
CommitOnCloseEvent(calculateDeadlineMs(timer)));

Review Comment:
   You're right. Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626331279


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+private final Timer timer;
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+this.timer = timer;
+}
+
+TimedRequestState(final LogContext logContext,

Review Comment:
   I made this `public` to match the other.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626329144


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+private final Timer timer;
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+this.timer = timer;
+}
+
+TimedRequestState(final LogContext logContext,
+  final String owner,
+  final long retryBackoffMs,
+  final int retryBackoffExpBase,
+  final long retryBackoffMaxMs,
+  final double jitter,
+  final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+this.timer = timer;
+}
+
+public boolean isExpired() {
+timer.update();
+return timer.isExpired();
+}
+
+public long remainingMs() {
+timer.update();
+return timer.remainingMs();
+}
+
+public static Timer deadlineTimer(final Time time, final long deadlineMs) {
+// Prevent the timer from being negative.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626327558


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   I updated the code to allow us to pass in a duration of 0, and it doesn't 
work for `CONSUMER` _or_ `CLASSIC` 🤔 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -469,7 +474,7 @@ private Throwable commitAsyncExceptionForError(Throwable 
error) {
  * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
  *
  * @param partitions   Partitions to fetch offsets for.
- * @param deadlineMs   Time until which the request should be retried 
if it fails

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626328276


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -465,39 +474,39 @@ private Throwable commitAsyncExceptionForError(Throwable 
error) {
  * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
  *
  * @param partitions   Partitions to fetch offsets for.
- * @param expirationTimeMs Time until which the request should be retried 
if it fails
+ * @param deadlineMs Time until which the request should be retried if it 
fails

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626327813


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -391,7 +398,7 @@ public CompletableFuture commitAsync(final 
Map

Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626193338


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -960,7 +960,7 @@ void maybeReconcile() {
 // best effort to commit the offsets in the case where the epoch might 
have changed while
 // the current reconciliation is in process. Note this is using the 
rebalance timeout as
 // it is the limit enforced by the broker to complete the 
reconciliation process.
-commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   Ah, I see! That makes sense! Then I propose to rename the field to something 
like `commitTimeoutDuringReconciliation` to make that clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626188794


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1280,7 +1280,7 @@ void prepareShutdown(final Timer timer, final 
AtomicReference firstEx
 if (autoCommitEnabled)
 autoCommitSync(timer);
 
-applicationEventHandler.add(new CommitOnCloseEvent());
+applicationEventHandler.add(new 
CommitOnCloseEvent(calculateDeadlineMs(timer)));

Review Comment:
   I don't quite get why we need this event as completable with a deadline. We 
issued a blocking commit sync above on ln 1281 (that one needs deadline, and it 
has it from the timer, and will be expired if needed etc.). But this 
`CommitOnCloseEvent` is only flipping a flag in the commit manager to not allow 
any more commits 
([this](https://github.com/apache/kafka/blob/55d38efcc5505a5a1bddb08ba05f4d923f8050f9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L187)).
 I may be missing something, but this is not the kind of event that needs a 
deadline to complete or be reaped (aka. CompletableApplicationEvent)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1626147694


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -960,7 +960,7 @@ void maybeReconcile() {
 // best effort to commit the offsets in the case where the epoch might 
have changed while
 // the current reconciliation is in process. Note this is using the 
rebalance timeout as
 // it is the limit enforced by the broker to complete the 
reconciliation process.
-commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   Hey @cadonna, your assumption is right: "the whole reconciliation is limited 
by the rebalance timeout", but the trick is that it's the broker who enforces 
that, not the client.
   
   That's why we don't see the `rebalanceTimeout` being used on the client 
side, other than for sending it to the broker in the initial HB, and here to 
limit the commit retries. We use it on the commit only because we needed to put 
a limit on the retries to make sure that if the commit keeps failing and being 
retried, the request is not kept forever on the client after the broker kicks 
the member out of the group. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1625726216


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -319,11 +326,11 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
 if (error == null) {
 result.complete(null);
 } else {
-if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {
-log.debug("Auto-commit sync before revocation timed 
out and won't be retried anymore");
-result.completeExceptionally(error);
-} else if (error instanceof 
UnknownTopicOrPartitionException) {
+if (requestAttempt.isExpired()) {

Review Comment:
   Follow-up to 
https://github.com/apache/kafka/pull/16031#discussion_r1624620726
   
   I am not sure if this is good. Let's assume I get a non-retriable error AND 
the timeout was exceeded. Do I really want to get a timeout exception wrapped 
around the non-retriable error? I want a timeout exception when I can improve 
the situation by increasing the timeout. If I get a non-retriable error, I 
cannot improve the situation by increasing the timeout.
   So, I would only throw a timeout exception when I get a retriable error.
   WDYT?
   
   This affects also the other location where the expiration is verified.
   
   Either way, tests are required that test the combination of (non-)retriable 
error and expiration.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
 runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   I do not understand this comment. Could you please elaborate?
   If I look where the poll is done, a duration of 100 ms is used.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -465,39 +474,39 @@ private Throwable commitAsyncExceptionForError(Throwable 
error) {
  * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
  *
  * @param partitions   Partitions to fetch offsets for.
- * @param expirationTimeMs Time until which the request should be retried 
if it fails
+ * @param deadlineMs Time until which the request should be retried if it 
fails

Review Comment:
   nit: the indentation is not consistent



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -391,7 +398,7 @@ public CompletableFuture commitAsync(final 
Maphttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {

Review Comment:
   Could you add unit tests for this class?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@l

Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-04 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1625709036


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -960,7 +960,7 @@ void maybeReconcile() {
 // best effort to commit the offsets in the case where the epoch might 
have changed while
 // the current reconciliation is in process. Note this is using the 
rebalance timeout as
 // it is the limit enforced by the broker to complete the 
reconciliation process.
-commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   My assumption was that the whole reconciliation is limited by the rebalance 
timeout. But here it seems that we only have a timeout on the auto commit sync. 
I would have expected that different parts of the reconciliation would update a 
timer and the commit sync would take the remaining time as the timeout.
   But maybe I missing something.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2146294482

   @lianetm @cadonna—this is ready for another review. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624997659


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -747,21 +748,19 @@ public void 
testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() {
 new TopicPartition("topic", 1),
 new OffsetAndMetadata(0));
 
-// Send sync offset commit request that fails with retriable error.
-long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
-completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
+// Send sync offset commit request.
+long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
+CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
 
-// Request retried after backoff, and fails with retriable again. 
Should not complete yet
+// Make the first request fail with a retriable error. Should not 
complete yet
 // given that the request timeout hasn't expired.
 time.sleep(retryBackoffMs);
 completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
 assertFalse(commitResult.isDone());
 
 // Sleep to expire the request timeout. Request should fail on the 
next poll.
 time.sleep(retryBackoffMs);
-NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
-assertEquals(0, res.unsentRequests.size());
+completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);

Review Comment:
   This code is reverted back to the original form now that we prune expired 
requests that have been attempted at least once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624996705


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest 
createUnsentRequest(
 private void handleError(final Throwable exception,
  final long completionTimeMs) {
 if (exception instanceof RetriableException) {
-if (completionTimeMs >= expirationTimeMs) {
+if (isExpired()) {

Review Comment:
   I refactored the code so that `handleResponse()` calls `handleError()` if it 
hits an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624992826


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -61,30 +62,28 @@
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
+private final Time time;
 private final boolean allowAutoTopicCreation;
 private final List inflightRequests;
+private final int requestTimeoutMs;
 private final long retryBackoffMs;
 private final long retryBackoffMaxMs;
 private final Logger log;
 private final LogContext logContext;
 
-public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+public TopicMetadataRequestManager(final LogContext context, final Time 
time, final ConsumerConfig config) {
 logContext = context;
 log = logContext.logger(getClass());
+this.time = time;
 inflightRequests = new LinkedList<>();
+requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
 allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
 }
 
 @Override
 public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-// Prune any requests which have timed out
-List expiredRequests = 
inflightRequests.stream()
-.filter(req -> req.isExpired(currentTimeMs))
-.collect(Collectors.toList());
-expiredRequests.forEach(TopicMetadataRequestState::expire);
-

Review Comment:
   I've updated the logic in `CommitRequestManager` and 
`TopicMetadataRequestManager` to proactively prune expired `RequestState`s that 
have had at least one request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624957099


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   agree that it's confusing, but for the record, I guess the current in the 
name may come from the point of view that here's the moment a request 
completes, and we retrieve the completion time, so could I could see it as the 
current because of where it's called (but still +1 for better name, simply 
`completionTimeMs`, that btw aligns with the `handleClientResponse` func param 
where it's used right below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624788878


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   It _shouldn't_. My preference is to make data as immutable as possible. 
Setting the `Timer` on creation vs. after the fact seems cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624815072


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+private final Timer timer;
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+this.timer = timer;
+}
+
+// Visible for testing

Review Comment:
   I removed the false comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624812730


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest 
createUnsentRequest(
 private void handleError(final Throwable exception,
  final long completionTimeMs) {
 if (exception instanceof RetriableException) {
-if (completionTimeMs >= expirationTimeMs) {
+if (isExpired()) {

Review Comment:
   I will take a look at refactoring this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624804702


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   I don't know 🤷‍♂️ 
   
   Every so often I think about changing it, but I generally don't include 
changes that are unrelated to my PR.
   
   Would you like me to change it in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624803424


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+private final Timer timer;
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+this.timer = timer;
+}
+
+// Visible for testing

Review Comment:
   Good catch! You're right!
   
   Where that constructor is called, there is a comment that it's for testing 
only, so I applied that same comment in `TimedRequestState`. But the existing 
comment is a lie, so my code perpetuates it. 🤦‍♂️
   
   I'll remove it from the new code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624801103


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -960,7 +960,7 @@ void maybeReconcile() {
 // best effort to commit the offsets in the case where the epoch might 
have changed while
 // the current reconciliation is in process. Note this is using the 
rebalance timeout as
 // it is the limit enforced by the broker to complete the 
reconciliation process.
-commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   I might misunderstand your question—are you suggesting to pass in the 
_timeout_ vs. the _deadline_? There's no `Timer` in the 
`getDeadlineMsForTimeout`. It just calculates the deadline from the timeout and 
handles overflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624792665


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -61,30 +62,28 @@
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
+private final Time time;
 private final boolean allowAutoTopicCreation;
 private final List inflightRequests;
+private final int requestTimeoutMs;
 private final long retryBackoffMs;
 private final long retryBackoffMaxMs;
 private final Logger log;
 private final LogContext logContext;
 
-public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+public TopicMetadataRequestManager(final LogContext context, final Time 
time, final ConsumerConfig config) {
 logContext = context;
 log = logContext.logger(getClass());
+this.time = time;
 inflightRequests = new LinkedList<>();
+requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
 allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
 }
 
 @Override
 public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-// Prune any requests which have timed out
-List expiredRequests = 
inflightRequests.stream()
-.filter(req -> req.isExpired(currentTimeMs))
-.collect(Collectors.toList());
-expiredRequests.forEach(TopicMetadataRequestState::expire);
-

Review Comment:
   It makes sense to have `poll()` expire requests that are _in flight_. I 
don't think `poll()` should expire _unsent_ requests, because we want to let 
them have a chance to run at least once. I'll change the logic to have `poll()` 
expire `Future`s related to requests that have been sent but that are now 
expired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624788878


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   It _shouldn't_. My preference is to make data as unmutable as possible. 
Setting the `Timer` on creation vs. after the fact seems cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624620726


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState 
requestAttempt,
 result.complete(null);
 } else {
 if (error instanceof RetriableException) {
-if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {

Review Comment:
   If I remember right, that was needed at some point to avoid retrying on 
[this](https://github.com/apache/kafka/blob/a68a1cce824a8346509d5194e0e43a3cb36ba09a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L838)
 `TimeoutException` internally generated by the manager when expiring requests 
on poll (vs retrying on the request `TimeoutException` received as a response 
if the api level timeout was still not expired). But with this PR the manager 
does not do that anymore, so agree we should check isExpired and fail, no 
matter the exception. 
   
   On top of that valid point from @cadonna, I think there's more to this. 
Before, the manager was internally throwing a TimeoutException, so at this 
point we could simply completeExceptionally with the same error (ln 441). But 
this also changes now. Similar to how the `TopicMetadataManager` does, if at 
this point we see that the request isExpired, I guess we need to throw a new 
TimeoutException (not the last known error, which is what is thrown now in ln 
441)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624694081


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -747,21 +748,19 @@ public void 
testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() {
 new TopicPartition("topic", 1),
 new OffsetAndMetadata(0));
 
-// Send sync offset commit request that fails with retriable error.
-long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
-completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
+// Send sync offset commit request.
+long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
+CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
 
-// Request retried after backoff, and fails with retriable again. 
Should not complete yet
+// Make the first request fail with a retriable error. Should not 
complete yet
 // given that the request timeout hasn't expired.
 time.sleep(retryBackoffMs);
 completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
 assertFalse(commitResult.isDone());
 
 // Sleep to expire the request timeout. Request should fail on the 
next poll.
 time.sleep(retryBackoffMs);
-NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
-assertEquals(0, res.unsentRequests.size());
+completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);

Review Comment:
   uhm this is exactly my concern on [comment 
above](https://github.com/apache/kafka/pull/16031/files#r1624689065). Before 
this PR poll would throw timeout because it knows that the call to commitSync 
had a deadline that expired. Now it requires a response to the request saying 
that it timed out. Same outcome, but it needs to sit and wait for something it 
already knows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624689065


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -61,30 +62,28 @@
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
+private final Time time;
 private final boolean allowAutoTopicCreation;
 private final List inflightRequests;
+private final int requestTimeoutMs;
 private final long retryBackoffMs;
 private final long retryBackoffMaxMs;
 private final Logger log;
 private final LogContext logContext;
 
-public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+public TopicMetadataRequestManager(final LogContext context, final Time 
time, final ConsumerConfig config) {
 logContext = context;
 log = logContext.logger(getClass());
+this.time = time;
 inflightRequests = new LinkedList<>();
+requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
 allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
 }
 
 @Override
 public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-// Prune any requests which have timed out
-List expiredRequests = 
inflightRequests.stream()
-.filter(req -> req.isExpired(currentTimeMs))
-.collect(Collectors.toList());
-expiredRequests.forEach(TopicMetadataRequestState::expire);
-

Review Comment:
   I totally agree that we shouldn't prune expired request on poll before 
returning the requests (because it does not give requests with timeout 0 their 
single time to run that we wanted). But not pruning at all on poll goes a bit 
farther that what I was expecting.  
   
   Seems conceptually not right that the manager will keep requests that it has 
sent out, and it knows are expired. It then relies on a timeout response to 
expire the request, and with this the outcome is the same, but timing is very 
different: the manager will wait for the full request_timeout before throwing a 
`TimeoutException` for a request that it knew it was expired since the moment 
it poll for it the first time right? (all that time it will keep the request on 
its queue as in-flight, skipping it on every poll just because it hasn't 
received a response). I expect that tests that throw Timeout on poll would fail 
with this, and they would need to wait for a network timeout response that was 
not needed before, so that the manager actually throws TimeoutException, that's 
what I'm referring to. 
   
   Managers already apply expiration logic to requests:
   1. on poll (before returning the requests) and on response (before this PR)
   2. on response only (this PR)
   3. on poll (after returning the requests) and on responses  -> would this be 
a sensible compromise, so that we end up with managers that apply the 
expiration logic consistently wherever they can? It's closer to what we had 
before this PR, and also fixes the fire-and-forget behaviour we were missing. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624689065


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -61,30 +62,28 @@
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
+private final Time time;
 private final boolean allowAutoTopicCreation;
 private final List inflightRequests;
+private final int requestTimeoutMs;
 private final long retryBackoffMs;
 private final long retryBackoffMaxMs;
 private final Logger log;
 private final LogContext logContext;
 
-public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+public TopicMetadataRequestManager(final LogContext context, final Time 
time, final ConsumerConfig config) {
 logContext = context;
 log = logContext.logger(getClass());
+this.time = time;
 inflightRequests = new LinkedList<>();
+requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
 allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
 }
 
 @Override
 public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-// Prune any requests which have timed out
-List expiredRequests = 
inflightRequests.stream()
-.filter(req -> req.isExpired(currentTimeMs))
-.collect(Collectors.toList());
-expiredRequests.forEach(TopicMetadataRequestState::expire);
-

Review Comment:
   I totally agree that we shouldn't prune expired request on poll before 
returning the requests (because it does not give requests with timeout 0 their 
single time to run that we wanted). But not pruning at all on poll goes a bit 
farther than what I was expecting.  
   
   Seems conceptually not right that the manager will keep requests that it has 
sent out, and it knows are expired. It then relies on a timeout response to 
expire the request, and with this the outcome is the same, but timing is very 
different: the manager will wait for the full request_timeout before throwing a 
`TimeoutException` for a request that it knew it was expired since the moment 
it poll for it the first time right? (all that time it will keep the request on 
its queue as in-flight, skipping it on every poll just because it hasn't 
received a response). I expect that tests that throw Timeout on poll would fail 
with this, and they would need to wait for a network timeout response that was 
not needed before, so that the manager actually throws TimeoutException, that's 
what I'm referring to. 
   
   Managers already apply expiration logic to requests:
   1. on poll (before returning the requests) and on response (before this PR)
   2. on response only (this PR)
   3. on poll (after returning the requests) and on responses  -> would this be 
a sensible compromise, so that we end up with managers that apply the 
expiration logic consistently wherever they can? It's closer to what we had 
before this PR, and also fixes the fire-and-forget behaviour we were missing. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624620726


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState 
requestAttempt,
 result.complete(null);
 } else {
 if (error instanceof RetriableException) {
-if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {

Review Comment:
   If I remember right, that was needed at some point to avoid retrying on 
[this](https://github.com/apache/kafka/blob/a68a1cce824a8346509d5194e0e43a3cb36ba09a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L838)
 `TimeoutException` internally generated by the manager when expiring requests 
on poll (vs retrying on the request `TimeoutException` received as a response 
if the api level timeout was still not expired). But with this PR the manager 
does not do that anymore, so agree we should check isExpired and fail, no 
matter the exception. 
   
   Note that there more to this. Before, the manager was internally throwing a 
TimeoutException, so at this point we could simply completeExceptionally with 
the same error (ln 441). But this also changes now. Similar to how the 
`TopicMetadataManager` does, if at this point we see that the request 
isExpired, I guess we need to throw a new TimeoutException (not the last known 
error, which is what is thrown now in ln 441)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624620726


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState 
requestAttempt,
 result.complete(null);
 } else {
 if (error instanceof RetriableException) {
-if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {

Review Comment:
   If I remember right, that was needed at some point to avoid retrying on 
[this](https://github.com/apache/kafka/blob/a68a1cce824a8346509d5194e0e43a3cb36ba09a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L838)
 `TimeoutException` internally generated by the manager when expiring requests 
on poll (vs retrying on the request `TimeoutException` received as a response 
if the api level timeout was still not expired). But with this PR the manager 
does not do that anymore, so agree we should check isExpired and fail, no 
matter the exception. 
   
   Note that there more to this. Before, the manager internally throw a 
TimeoutException, so at this point we could simply completeExceptionally with 
the error (ln 441). But this also changes now. Similar to how the 
`TopicMetadataManager` does, if at this point we see that the request 
isExpired, I guess we need to throw a new TimeoutException (not the last known 
error, which is what is thrown now in ln 441)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624549518


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest 
createUnsentRequest(
 private void handleError(final Throwable exception,
  final long completionTimeMs) {
 if (exception instanceof RetriableException) {
-if (completionTimeMs >= expirationTimeMs) {
+if (isExpired()) {

Review Comment:
   this logic of what we want to do on retriable errors seems to be the same 
for all retriable errors (no matter if found in `handleError` or 
`handleResponse`). Could we maybe consolidate it in a single place 
`maybeExpireOnRetriable` or something like it, that would be consistently 
applied whenever we need to maybe expire?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-06-03 Thread via GitHub


cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624238381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
  */
 abstract CompletableFuture future();
 
-/**
- * Complete the request future with a TimeoutException if the request 
timeout has been
- * reached, based on the provided current time.
- */
-void maybeExpire(long currentTimeMs) {
-if (retryTimeoutExpired(currentTimeMs)) {
-removeRequest();
-isExpired = true;
-future().completeExceptionally(new 
TimeoutException(requestDescription() +
-" could not complete before timeout expired."));
-}
-}
-
 /**
  * Build request with the given builder, including response handling 
logic.
  */
 NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder builder) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator());
+coordinatorRequestManager.coordinator(),
+time.timer(requestTimeoutMs)
+);
 request.whenComplete(
 (response, throwable) -> {
 long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   Not directly related to this PR: Why is this variable called `currentTimeMs`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+private final Timer timer;
+
+public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+this.timer = timer;
+}
+
+// Visible for testing

Review Comment:
   This does not seem to be true. The constructor is not called in tests and 
also the constructors that call this constructor are not called in tests. That 
is one reason I do not like this kind of comments, because they easily start to 
lie. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   I guess it does not make a big difference if the timer is set when the 
unsent request is created vs. when the unsent request is added to the network 
client, right? 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState 
requestAttempt,
 result.complete(null);
 } else {
 if (error instanceof RetriableException) {
-if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {

Review Comment:
   Why do we only check expiration in case of a `TimeoutException` but not for 
other retriable exceptions?
   The `TopicMetadataRequestManager` handles this differently.



##
c

Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager 
implementations respect user-provided timeout
URL: https://github.com/apache/kafka/pull/16031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558404

I'm going to close and reopen to force another build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager 
implementations respect user-provided timeout
URL: https://github.com/apache/kafka/pull/16031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558063

   @lianetm—relevant test failures have been addressed. There are three 
unrelated test failures from flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-29 Thread via GitHub


lianetm commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2137670350

   Hey @kirktrue, thanks for the changes, I'll take a look, but I notice 
several consumer integration tests are failing here for commits, are we still 
missing changes/fixes maybe?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-28 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1618010550


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -133,43 +131,6 @@ public void 
testAllTopicsExceptionAndInflightRequests(final Errors error, final
 }
 }
 
-@Test
-public void testExpiringRequest() {
-String topic = "hello";
-
-// Request topic metadata with 1000ms expiration
-long now = this.time.milliseconds();
-CompletableFuture>> future =
-this.topicMetadataRequestManager.requestTopicMetadata(topic, now + 
1000L);
-assertEquals(1, 
this.topicMetadataRequestManager.inflightRequests().size());
-
-// Poll the request manager to get the list of requests to send
-// - fail the request with a RetriableException
-NetworkClientDelegate.PollResult res = 
this.topicMetadataRequestManager.poll(this.time.milliseconds());
-assertEquals(1, res.unsentRequests.size());
-
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse(
-res.unsentRequests.get(0),
-topic,
-Errors.REQUEST_TIMED_OUT));
-
-// Sleep for long enough to exceed the backoff delay but still within 
the expiration
-// - fail the request again with a RetriableException
-this.time.sleep(500);
-res = this.topicMetadataRequestManager.poll(this.time.milliseconds());
-assertEquals(1, res.unsentRequests.size());
-
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse(
-res.unsentRequests.get(0),
-topic,
-Errors.REQUEST_TIMED_OUT));
-
-// Sleep for long enough to expire the request which should fail
-this.time.sleep(1000);
-res = this.topicMetadataRequestManager.poll(this.time.milliseconds());
-assertEquals(0, res.unsentRequests.size());
-assertEquals(0, 
this.topicMetadataRequestManager.inflightRequests().size());
-assertTrue(future.isCompletedExceptionally());
-}
-

Review Comment:
   This test exercises the preemptive pruning, which we no longer use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-28 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1618010264


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -123,23 +122,6 @@ public void testEnsureCorrectCompletionTimeOnComplete() {
 assertEquals(timeMs, unsentRequest.handler().completionTimeMs());
 }
 
-@Test
-public void testEnsureTimerSetOnAdd() {
-NetworkClientDelegate ncd = newNetworkClientDelegate();
-NetworkClientDelegate.UnsentRequest findCoordRequest = 
newUnsentFindCoordinatorRequest();
-assertNull(findCoordRequest.timer());
-
-// NetworkClientDelegate#add
-ncd.add(findCoordRequest);
-assertEquals(1, ncd.unsentRequests().size());
-assertEquals(REQUEST_TIMEOUT_MS, 
ncd.unsentRequests().poll().timer().timeoutMs());
-
-// NetworkClientDelegate#addAll
-ncd.addAll(Collections.singletonList(findCoordRequest));
-assertEquals(1, ncd.unsentRequests().size());
-assertEquals(REQUEST_TIMEOUT_MS, 
ncd.unsentRequests().poll().timer().timeoutMs());
-}
-

Review Comment:
   The `Timer` is provided via the constructor, so this test is no longer 
needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-28 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1618009815


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -779,16 +778,17 @@ public void 
testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExp
 new TopicPartition("topic", 1),
 new OffsetAndMetadata(0));
 
-// Send offset commit request that fails with retriable error.
-long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
+// Send offset commit request.
+long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
+CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
+
+// Make the first request fail due to a missing coordinator.

Review Comment:
   Same here as above, update this code now that we don't preemptive prune 
expired requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-28 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1618009570


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -747,21 +748,19 @@ public void 
testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() {
 new TopicPartition("topic", 1),
 new OffsetAndMetadata(0));
 
-// Send sync offset commit request that fails with retriable error.
-long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
-completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
+// Send sync offset commit request.
+long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
+CompletableFuture commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);

Review Comment:
   This logic needed to be revised as the previous code relied on the 
preemptive pruning of expired requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-28 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1618009040


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -61,30 +62,28 @@
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
+private final Time time;
 private final boolean allowAutoTopicCreation;
 private final List inflightRequests;
+private final int requestTimeoutMs;
 private final long retryBackoffMs;
 private final long retryBackoffMaxMs;
 private final Logger log;
 private final LogContext logContext;
 
-public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+public TopicMetadataRequestManager(final LogContext context, final Time 
time, final ConsumerConfig config) {
 logContext = context;
 log = logContext.logger(getClass());
+this.time = time;
 inflightRequests = new LinkedList<>();
+requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
 allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
 }
 
 @Override
 public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-// Prune any requests which have timed out
-List expiredRequests = 
inflightRequests.stream()
-.filter(req -> req.isExpired(currentTimeMs))
-.collect(Collectors.toList());
-expiredRequests.forEach(TopicMetadataRequestState::expire);
-

Review Comment:
   As with `CommitRequestManager`, we don't proactively prune requests. 
Instead, we wait for the `NetworkClient` to invoke our callback with a 
`TimeoutException` so there's a single path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-28 Thread via GitHub


kirktrue commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1618007950


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -235,7 +235,6 @@ public void addAll(final List requests) {
 
 public void add(final UnsentRequest r) {
 Objects.requireNonNull(r);
-r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   We provide the `Timer` when creating the `UnsentRequest`, so we don't need 
to add it afterward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >