Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2154323060 Merged to `trunk` and cherry-picked to `3.8` 🥳 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna merged PR #16031: URL: https://github.com/apache/kafka/pull/16031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628473245 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: If I add that to `offsetExceptionSupplier()` and `isRetriableOnOffsetFetch()`, the relevant tests fail 🤔 That error is being tested in `testOffsetFetchRequestPartitionDataError()` via its supplier `partitionDataErrorSupplier()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628462153 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: those are right, but just missing `UnstableOffsetCommitException`, that is retriable and thrown on fetch [here](https://github.com/apache/kafka/blob/896af1b2f2f2a7d579e0aef074bcb2004c0246f2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1073). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628462153 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: those are right, but you're missing `UnstableOffsetCommitException`, that is retriable and thrown on fetch [here](https://github.com/apache/kafka/blob/896af1b2f2f2a7d579e0aef074bcb2004c0246f2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1073). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628457770 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: After removing the boolean flags from `offsetFetchExceptionSupplier()`, it became identical to `offsetCommitExceptionSupplier()`. So I refactored things a bit further and renamed `offsetCommitExceptionSupplier()` as `offsetExceptionSupplier()` for use by the relevant parameterized tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2150969566 Fixed `TieredStorageTestContext` breakage. Sorry about that 🤦♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628442774 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: This is what I have that is needed to work: ```java private boolean isRetriableOnOffsetFetch(Errors error) { return error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE; } ``` Does that seem correct, @lianetm? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628354409 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: yes, that makes sense to me. In the end it's a good point that we only have 2 exceptions for retriables that are not considered retriable on the fetch path: unknown_topic and request timeout, so we could simply maybe have a helper isRetriableOnFetch that would return true if error.exception() instanceof RetriableException && !unknown_topic && !request_timeout, something like it. Up to you @kirktrue, but makes sense to me to get rid of the arg to maintain and be explicit as Bruno suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628332835 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: If the exceptions (not java exception, but exceptions from the rule) are just a few, wouldn't it be better for readability, to call them out explicitly here in the test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628332835 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: If the exceptions are just a few, wouldn't it be better for readability, to call them out explicitly here in the test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628330597 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: this one should be like this I would say, because this is the fetch (where we so have errors that from the exception definition are retriables, but we don't consider them retriable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2150792103 One failure with the TestUtils changes, breaking TieredStorageTestContext.java, could you pls check @kirktrue ? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628321190 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +613,46 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { Review Comment: Do you also plan to use here `error.exception() instanceof RetriableException`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628306229 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. -commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); +commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: You can do it as a MINOR pull request, if you want, but it really is just a renaming of that variable and of the parameter in the constructor as you correctly stated in the ticket. Regarding your point in the ticket about there is more to it than changing the name, I cannot completely follow. When somebody instantiates a `MembershipManagerImpl` and passes the config `rebalance.timeout.ms` into the constructor for parameter `commitTimeoutDuringReconciliation` they state that they want to use the value of config `rebalance.timeout.ms` as `commitTimeoutDuringReconciliation`. This seems quite clear to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628279848 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: great, the outcome is what matters: we have an integration test now passing with the new consumer making progress on continuous poll ZERO, yay! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628208730 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: I'm not sure what I did wrong the first time. I was able to add my changes back and it works now for all combinations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628166108 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: `'s/currentTimeMs/completionTimeMs/` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628166108 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: `'s/currentTimeMs/completionTimeMs/'` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628164186 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. -commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); +commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: Created KAFKA-16899 as a little bit of thought needs to be given to how the name change is handled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628148498 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -831,10 +823,9 @@ abstract class RetriableRequestState extends RequestState { * Complete the request future with a TimeoutException if the request timeout has been * reached, based on the provided current time. Review Comment: Updated. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628147009 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { +futures.forEach(f -> assertFalse(f.isDone())); + +// Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each +// OffsetFetchRequestState is evaluated via isExpired(). +time.sleep(defaultApiTimeoutMs); +commitRequestManager.poll(time.milliseconds()); +futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); +} else { +futures.forEach(f -> assertFutureThrows(f, KafkaException.class)); +assertEmptyPendingRequests(commitRequestManager); Review Comment: Added checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628143167 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -1097,19 +1132,18 @@ private void testNonRetriable(final List offsetCommitExceptionSupplier() { return Stream.of( -Arguments.of(Errors.NOT_COORDINATOR), -Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS), -Arguments.of(Errors.UNKNOWN_SERVER_ERROR), -Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED), -Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE), -Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE), -Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), -Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), -Arguments.of(Errors.REQUEST_TIMED_OUT), -Arguments.of(Errors.FENCED_INSTANCE_ID), -Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), -Arguments.of(Errors.STALE_MEMBER_EPOCH), -Arguments.of(Errors.UNKNOWN_MEMBER_ID)); +Arguments.of(Errors.NOT_COORDINATOR, true), +Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true), +Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), +Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false), +Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), +Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), +Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), +Arguments.of(Errors.REQUEST_TIMED_OUT, false), +Arguments.of(Errors.FENCED_INSTANCE_ID, false), +Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.UNKNOWN_MEMBER_ID, false)); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628142116 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, result.complete(null); } else { if (error instanceof RetriableException) { -if (error instanceof TimeoutException && requestAttempt.isExpired) { +if (error instanceof TimeoutException && requestAttempt.isExpired()) { Review Comment: I've updated the logic that checks `isExpired()`. PTAL at the latest version. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628140810 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java: ## @@ -123,23 +122,6 @@ public void testEnsureCorrectCompletionTimeOnComplete() { assertEquals(timeMs, unsentRequest.handler().completionTimeMs()); } -@Test -public void testEnsureTimerSetOnAdd() { -NetworkClientDelegate ncd = newNetworkClientDelegate(); -NetworkClientDelegate.UnsentRequest findCoordRequest = newUnsentFindCoordinatorRequest(); -assertNull(findCoordRequest.timer()); - -// NetworkClientDelegate#add -ncd.add(findCoordRequest); -assertEquals(1, ncd.unsentRequests().size()); -assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs()); - -// NetworkClientDelegate#addAll -ncd.addAll(Collections.singletonList(findCoordRequest)); -assertEquals(1, ncd.unsentRequests().size()); -assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs()); -} - Review Comment: This change was reverted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628139749 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: Thanks for the reminder to update the PR description, @lianetm! Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628082521 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: Sure! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628049433 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: I see from the commits that we reverted back to the original way, where the request timeout is defined at the networkclient level common for all requests, and passed onto the request on add, nice, let's just update the PR description then (we can remove point 4) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628023841 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -831,10 +823,9 @@ abstract class RetriableRequestState extends RequestState { * Complete the request future with a TimeoutException if the request timeout has been * reached, based on the provided current time. Review Comment: Let's update this to include the "if the request has been sent out at least once and...(timeout expired)" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1628017722 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java: ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TimedRequestStateTest { + +private final static long DEFAULT_TIMEOUT_MS = 3; +private final Time time = new MockTime(); + +@Test +public void testIsExpired() { +TimedRequestState state = new TimedRequestState( +new LogContext(), +this.getClass().getSimpleName(), +100, +1000, +time.timer(DEFAULT_TIMEOUT_MS) +); +assertFalse(state.isExpired()); +time.sleep(DEFAULT_TIMEOUT_MS); +assertTrue(state.isExpired()); +} + +@Test +public void testRemainingMs() { +TimedRequestState state = new TimedRequestState( +new LogContext(), +this.getClass().getSimpleName(), +100, +1000, +time.timer(DEFAULT_TIMEOUT_MS) +); +assertEquals(DEFAULT_TIMEOUT_MS, state.remainingMs()); +time.sleep(DEFAULT_TIMEOUT_MS); +assertEquals(0, state.remainingMs()); +} + +@Test +public void testDeadlineTimer() { +long deadlineMs = time.milliseconds() + DEFAULT_TIMEOUT_MS; +Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs); +assertEquals(DEFAULT_TIMEOUT_MS, timer.remainingMs()); +timer.sleep(DEFAULT_TIMEOUT_MS); +assertEquals(0, timer.remainingMs()); +} + +@Test +public void testAllowOverdueDeadlineTimer() { +long deadlineMs = time.milliseconds() - DEFAULT_TIMEOUT_MS; +Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs); +assertEquals(0, timer.remainingMs()); +} + +@Test +public void testToStringUpdatesTimer() { +TimedRequestState state = new TimedRequestState( +new LogContext(), +this.getClass().getSimpleName(), +100, +1000, +time.timer(DEFAULT_TIMEOUT_MS) +); + +assertToString(state, DEFAULT_TIMEOUT_MS); +time.sleep(DEFAULT_TIMEOUT_MS); +assertToString(state, 0); +} + +private void assertToString(TimedRequestState state, long timerMs) { +String[] toString = state.toString().split(", "); +assertTrue(toString.length > 0); +String expected = "remainingMs=" + timerMs + "}"; +String actual = toString[toString.length - 1]; +assertEquals(expected, actual); Review Comment: what about simplifying all this with a single: ```suggestion assertTrue(state.toString().contains("remainingMs=" + timerMs + "}")); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1627986247 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { +futures.forEach(f -> assertFalse(f.isDone())); + +// Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each +// OffsetFetchRequestState is evaluated via isExpired(). +time.sleep(defaultApiTimeoutMs); +commitRequestManager.poll(time.milliseconds()); +futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); +} else { +futures.forEach(f -> assertFutureThrows(f, KafkaException.class)); +assertEmptyPendingRequests(commitRequestManager); Review Comment: I see, still even if we leave the inflight, it would be helpful to assert that that we're not leaving any unsent after we expire them on ln 637/638. Something like this would be reassuring: ``` assertFalse(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); commitRequestManager.poll(time.milliseconds()); futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626764237 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: That is weird 🤔 I'll try to add it back in as part of the next round of feedback changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2148602850 @lianetm @cadonna I believe I've handled the feedback (thanks!) and this is ready for another review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626761730 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -1097,19 +1132,18 @@ private void testNonRetriable(final List offsetCommitExceptionSupplier() { return Stream.of( -Arguments.of(Errors.NOT_COORDINATOR), -Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS), -Arguments.of(Errors.UNKNOWN_SERVER_ERROR), -Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED), -Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE), -Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE), -Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), -Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), -Arguments.of(Errors.REQUEST_TIMED_OUT), -Arguments.of(Errors.FENCED_INSTANCE_ID), -Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), -Arguments.of(Errors.STALE_MEMBER_EPOCH), -Arguments.of(Errors.UNKNOWN_MEMBER_ID)); +Arguments.of(Errors.NOT_COORDINATOR, true), +Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true), +Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), +Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false), +Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), +Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), +Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), +Arguments.of(Errors.REQUEST_TIMED_OUT, false), +Arguments.of(Errors.FENCED_INSTANCE_ID, false), +Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.UNKNOWN_MEMBER_ID, false)); Review Comment: Nice catch. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626759185 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + +private final Timer timer; + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); +this.timer = timer; +} + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final int retryBackoffExpBase, + final long retryBackoffMaxMs, + final double jitter, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter); +this.timer = timer; +} + +public boolean isExpired() { +timer.update(); +return timer.isExpired(); +} + +public long remainingMs() { +timer.update(); +return timer.remainingMs(); +} + +public static Timer deadlineTimer(final Time time, final long deadlineMs) { +long diff = Math.max(0, deadlineMs - time.milliseconds()); +return time.timer(diff); +} + + +@Override +protected String toStringBase() { +return super.toStringBase() + ", timer=" + remainingMs(); Review Comment: Updated to use `remainingMs`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -313,7 +306,7 @@ public String toString() { "requestBuilder=" + requestBuilder + ", handler=" + handler + ", node=" + node + -", timer=" + timer + +", timer=" + timer.remainingMs() + Review Comment: Changed. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626759656 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -875,8 +872,12 @@ private void handleClientResponse(final ClientResponse response, abstract void onResponse(final ClientResponse response); -boolean retryTimeoutExpired(long currentTimeMs) { -return expirationTimeMs.isPresent() && expirationTimeMs.get() <= currentTimeMs; +/** + * If at least one attempt has been sent, and the user-provided timeout has elapsed, consider the + * request as expired. + */ +boolean retryTimeoutExpired() { Review Comment: ...and folding the checks into the caller directly. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -516,8 +527,9 @@ private void fetchOffsetsWithRetries(final OffsetFetchRequestState fetchRequest, result.complete(res); } else { if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { -if (error instanceof TimeoutException && fetchRequest.isExpired) { -result.completeExceptionally(error); +if (fetchRequest.isExpired()) { +log.debug("Fetch request for {} timed out and won't be retried anymore", fetchRequest.requestedPartitions); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626759390 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -875,8 +872,12 @@ private void handleClientResponse(final ClientResponse response, abstract void onResponse(final ClientResponse response); -boolean retryTimeoutExpired(long currentTimeMs) { -return expirationTimeMs.isPresent() && expirationTimeMs.get() <= currentTimeMs; +/** + * If at least one attempt has been sent, and the user-provided timeout has elapsed, consider the + * request as expired. + */ +boolean retryTimeoutExpired() { Review Comment: I solved it by removing the method altogether 😉 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626752933 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { +futures.forEach(f -> assertFalse(f.isDone())); + +// Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each +// OffsetFetchRequestState is evaluated via isExpired(). +time.sleep(defaultApiTimeoutMs); +commitRequestManager.poll(time.milliseconds()); +futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); +} else { +futures.forEach(f -> assertFutureThrows(f, KafkaException.class)); +assertEmptyPendingRequests(commitRequestManager); Review Comment: I wasn't removing the inflight requests since the code in the response handler removes it. If we did remove the inflight requests before the response was received, the user would see the warning about the request not being in the "buffer" when it did complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626748855 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -319,11 +326,11 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState if (error == null) { result.complete(null); } else { -if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { -if (error instanceof TimeoutException && requestAttempt.isExpired()) { -log.debug("Auto-commit sync before revocation timed out and won't be retried anymore"); -result.completeExceptionally(error); -} else if (error instanceof UnknownTopicOrPartitionException) { +if (requestAttempt.isExpired()) { Review Comment: I reverted the code so that we only check `isExpired()` on `RetriableException`s and added unit tests to verify behavior. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2148416617 Hey @kirktrue , I just completed another full pass, left some comments, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626603047 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { +futures.forEach(f -> assertFalse(f.isDone())); + +// Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each +// OffsetFetchRequestState is evaluated via isExpired(). +time.sleep(defaultApiTimeoutMs); +commitRequestManager.poll(time.milliseconds()); +futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); +} else { +futures.forEach(f -> assertFutureThrows(f, KafkaException.class)); +assertEmptyPendingRequests(commitRequestManager); Review Comment: Actually this makes me wonder, are we removing the inflight fetch requests when we expire the request? I can only see them removed when we receive a response, but not when we intentionally expire the requests (where we remove only the unsent) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626596195 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -613,15 +614,44 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool } } +@ParameterizedTest +@MethodSource("offsetFetchExceptionSupplier") +public void testOffsetFetchRequestTimeoutRequests(final Errors error, + final boolean isRetriable) { +CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Set partitions = new HashSet<>(); +partitions.add(new TopicPartition("t1", 0)); +List>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( +commitRequestManager, +partitions, +1, +error); + +if (isRetriable) { +futures.forEach(f -> assertFalse(f.isDone())); + +// Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each +// OffsetFetchRequestState is evaluated via isExpired(). +time.sleep(defaultApiTimeoutMs); +commitRequestManager.poll(time.milliseconds()); +futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); +} else { +futures.forEach(f -> assertFutureThrows(f, KafkaException.class)); +assertEmptyPendingRequests(commitRequestManager); Review Comment: I guess we should be able to assert this no matter if retriable or not (just because in this test we're forcing a timeout on retriables). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -1097,19 +1132,18 @@ private void testNonRetriable(final List offsetCommitExceptionSupplier() { return Stream.of( -Arguments.of(Errors.NOT_COORDINATOR), -Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS), -Arguments.of(Errors.UNKNOWN_SERVER_ERROR), -Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED), -Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE), -Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE), -Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), -Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), -Arguments.of(Errors.REQUEST_TIMED_OUT), -Arguments.of(Errors.FENCED_INSTANCE_ID), -Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), -Arguments.of(Errors.STALE_MEMBER_EPOCH), -Arguments.of(Errors.UNKNOWN_MEMBER_ID)); +Arguments.of(Errors.NOT_COORDINATOR, true), +Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true), +Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), +Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false), +Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), +Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), +Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), +Arguments.of(Errors.REQUEST_TIMED_OUT, false), +Arguments.of(Errors.FENCED_INSTANCE_ID, false), +Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.UNKNOWN_MEMBER_ID, false)); Review Comment: I would say we don't need to keep/maintain this new argument here to know if the exception is retriable. We can simply check `error.exception() instanceof RetriableException` when needed?? For the record, in the case of the `offsetFetchExceptionSupplier` below, we do need the argument because there are retriable exceptions that we don't want to consider retriable when fetching offsets (Unknown_topic IIRC) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -1097,19 +1132,18 @@ private void testNonRetriable(final List offsetCommitExceptionSupplier() { return Stream.of( -Arguments.of(Errors.NOT_COORDINATOR), -Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS), -Arguments.of(Errors.UNKNOWN_SERVER_ERROR), -Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED), -Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE), -Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE), -Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), -Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), -Arguments.of(Errors.REQUEST_TIMED_OUT), -Arguments.of(Errors.FENCED_INSTANCE_ID), -Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), -Arguments.of(Errors.STALE_MEMBER_EPOCH), -Arguments.of(Errors.UNKNOWN_MEMBER_ID)); +Arguments.of(Errors.NOT_COORDINATOR, true), +Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true), +Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), +Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false), +Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), +Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), +Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), +Arguments.of(Errors.REQUEST_TIMED_OUT, false), +Arguments.of(Errors.FENCED_INSTANCE_ID, false), +Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.UNKNOWN_MEMBER_ID, false)); Review Comment: I would say we don't need to keep/maintain this new argument here to know if the exception is retriable. We can simply check `error.exception() instanceof RetriableException` when needed?? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -1097,19 +1132,18 @@ private void testNonRetriable(final List offsetCommitExceptionSupplier() { return Stream.of( -Arguments.of(Errors.NOT_COORDINATOR), -Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS), -Arguments.of(Errors.UNKNOWN_SERVER_ERROR), -Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED), -Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE), -Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE), -Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), -Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), -Arguments.of(Errors.REQUEST_TIMED_OUT), -Arguments.of(Errors.FENCED_INSTANCE_ID), -Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), -Arguments.of(Errors.STALE_MEMBER_EPOCH), -Arguments.of(Errors.UNKNOWN_MEMBER_ID)); +Arguments.of(Errors.NOT_COORDINATOR, true), +Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true), +Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), +Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false), +Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), +Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), +Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), +Arguments.of(Errors.REQUEST_TIMED_OUT, false), +Arguments.of(Errors.FENCED_INSTANCE_ID, false), +Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.UNKNOWN_MEMBER_ID, false)); Review Comment: I would say we don't need to keep/maintain this new argument here to know if the exception is retriable. We can simply check `error.exception() instanceof RetriableException` when needed. For the record, in the case of the `offsetFetchExceptionSupplier` below, we do need the argument because there are retriable exceptions that we don't want to consider retriable when fetching offsets (Stale member epoch) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626569362 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -1097,19 +1132,18 @@ private void testNonRetriable(final List offsetCommitExceptionSupplier() { return Stream.of( -Arguments.of(Errors.NOT_COORDINATOR), -Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS), -Arguments.of(Errors.UNKNOWN_SERVER_ERROR), -Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED), -Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE), -Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE), -Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), -Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), -Arguments.of(Errors.REQUEST_TIMED_OUT), -Arguments.of(Errors.FENCED_INSTANCE_ID), -Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), -Arguments.of(Errors.STALE_MEMBER_EPOCH), -Arguments.of(Errors.UNKNOWN_MEMBER_ID)); +Arguments.of(Errors.NOT_COORDINATOR, true), +Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true), +Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), +Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false), +Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), +Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), +Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), +Arguments.of(Errors.REQUEST_TIMED_OUT, false), +Arguments.of(Errors.FENCED_INSTANCE_ID, false), +Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false), +Arguments.of(Errors.UNKNOWN_MEMBER_ID, false)); Review Comment: I would say we don't need to keep/maintain this new argument here to know if the exception is retriable. We can simply check `error.exception() instanceof RetriableException` when needed. For the record, in the case of the `offsetFetchExceptionSupplier` we do need the argument because there are retriable exceptions that we don't want to consider retriable when fetching offsets (Stale member epoch) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626550778 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + +private final Timer timer; + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); +this.timer = timer; +} + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final int retryBackoffExpBase, + final long retryBackoffMaxMs, + final double jitter, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter); +this.timer = timer; +} + +public boolean isExpired() { +timer.update(); +return timer.isExpired(); +} + +public long remainingMs() { +timer.update(); +return timer.remainingMs(); +} + +public static Timer deadlineTimer(final Time time, final long deadlineMs) { +long diff = Math.max(0, deadlineMs - time.milliseconds()); +return time.timer(diff); +} + + +@Override +protected String toStringBase() { +return super.toStringBase() + ", timer=" + remainingMs(); Review Comment: same as for UnsentRequests toString, seeing "timer=100" will be confusing to understand that 100 is the remaining time in ms. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626544295 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -313,7 +306,7 @@ public String toString() { "requestBuilder=" + requestBuilder + ", handler=" + handler + ", node=" + node + -", timer=" + timer + +", timer=" + timer.remainingMs() + Review Comment: ```suggestion ", remainingMs=" + timer.remainingMs() + ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626534240 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: uhm well there is a difference: before these changes, the timeout was defined at the `NetworkClientDelegate` level and not per request, which made sense I would say, since it's the same `REQUEST_TIMEOUT_MS_CONFIG` that needs to be applied to all UnsentRequests, so it was applied here on this ln 238 when any request was added. Now we're saying that each UnsentRequest may have a diff timeout, but I see we end up with each manager taking that same REQUEST_TIMEOUT_MS_CONFIG config and passing it to each request. Outcome is the same, but actually the original version of applying the REQUEST_TIMEOUT_MS_CONFIG to all requests in the network layer seemed simpler I would say (unless I'm missing a gain of this option of having the REQUEST_TIMEOUT_MS_CONFIG at the managers level). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626513292 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -875,8 +872,12 @@ private void handleClientResponse(final ClientResponse response, abstract void onResponse(final ClientResponse response); -boolean retryTimeoutExpired(long currentTimeMs) { -return expirationTimeMs.isPresent() && expirationTimeMs.get() <= currentTimeMs; +/** + * If at least one attempt has been sent, and the user-provided timeout has elapsed, consider the + * request as expired. + */ +boolean retryTimeoutExpired() { Review Comment: name nit: this is not only about timeout expired anymore, it considers if the request was sent (significant conceptual diff I would say), so what about aligning the name? (`sentAndExpired` maybe, or whatever you prefer to better reflect what it actually checks now) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626501003 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -516,8 +527,9 @@ private void fetchOffsetsWithRetries(final OffsetFetchRequestState fetchRequest, result.complete(res); } else { if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { -if (error instanceof TimeoutException && fetchRequest.isExpired) { -result.completeExceptionally(error); +if (fetchRequest.isExpired()) { +log.debug("Fetch request for {} timed out and won't be retried anymore", fetchRequest.requestedPartitions); Review Comment: nit: let's use "OffsetFetch request" to avoid confusion with Fetch requests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626479394 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: hey, I ran it locally again (I only change TestUtils ln 908 to poll with Duration.ZERO as a quick shortcut): - on trunk: test fails for new consumer only, passes for all other combinations - on this branch with latest changes: test passes for all combinations (yay!) Not sure what made you have it failing for all combinations but to me this PR seems to be fixing the issue, worth double checking to add the test if possible, it's a very good validation of the changes. (or ok to leave it out if you prefer, but good news that it seems to work here already) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626479394 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: hey, I ran it locally again (I only change TestUtils ln 908 to poll with Duration.ZERO as a quick shortcut): - on trunk: test fails for new consumer only, passes for all other combinations - on this branch with latest changes: test passes for all combinations (yay!) Not sure what made you have it failing for all combinations but to me this PR seems to be fixing the issue, worth double checking to add the test if possible, it's a very good validation of the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626395594 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: I've removed this for now, because the test won't pass until later PRs are merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626391190 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. -commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); +commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: Can we handle this as a separate Jira or even a `[MINOR]` pull request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626390121 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: Can we handle this as a separate Jira or even a `[MINOR]` pull request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626375578 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { Review Comment: Added `TimedRequestStateTest`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1280,7 +1280,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx if (autoCommitEnabled) autoCommitSync(timer); -applicationEventHandler.add(new CommitOnCloseEvent()); +applicationEventHandler.add(new CommitOnCloseEvent(calculateDeadlineMs(timer))); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626339539 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1280,7 +1280,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx if (autoCommitEnabled) autoCommitSync(timer); -applicationEventHandler.add(new CommitOnCloseEvent()); +applicationEventHandler.add(new CommitOnCloseEvent(calculateDeadlineMs(timer))); Review Comment: You're right. Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626331279 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + +private final Timer timer; + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); +this.timer = timer; +} + +TimedRequestState(final LogContext logContext, Review Comment: I made this `public` to match the other. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { Review Comment: Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626329144 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + +private final Timer timer; + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); +this.timer = timer; +} + +TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final int retryBackoffExpBase, + final long retryBackoffMaxMs, + final double jitter, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter); +this.timer = timer; +} + +public boolean isExpired() { +timer.update(); +return timer.isExpired(); +} + +public long remainingMs() { +timer.update(); +return timer.remainingMs(); +} + +public static Timer deadlineTimer(final Time time, final long deadlineMs) { +// Prevent the timer from being negative. Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626327558 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: I updated the code to allow us to pass in a duration of 0, and it doesn't work for `CONSUMER` _or_ `CLASSIC` 🤔 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -469,7 +474,7 @@ private Throwable commitAsyncExceptionForError(Throwable error) { * Enqueue a request to fetch committed offsets, that will be sent on the next call to {@link #poll(long)}. * * @param partitions Partitions to fetch offsets for. - * @param deadlineMs Time until which the request should be retried if it fails Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626328276 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -465,39 +474,39 @@ private Throwable commitAsyncExceptionForError(Throwable error) { * Enqueue a request to fetch committed offsets, that will be sent on the next call to {@link #poll(long)}. * * @param partitions Partitions to fetch offsets for. - * @param expirationTimeMs Time until which the request should be retried if it fails + * @param deadlineMs Time until which the request should be retried if it fails Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626327813 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -391,7 +398,7 @@ public CompletableFuture commitAsync(final Map
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626193338 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. -commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); +commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: Ah, I see! That makes sense! Then I propose to rename the field to something like `commitTimeoutDuringReconciliation` to make that clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626188794 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1280,7 +1280,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx if (autoCommitEnabled) autoCommitSync(timer); -applicationEventHandler.add(new CommitOnCloseEvent()); +applicationEventHandler.add(new CommitOnCloseEvent(calculateDeadlineMs(timer))); Review Comment: I don't quite get why we need this event as completable with a deadline. We issued a blocking commit sync above on ln 1281 (that one needs deadline, and it has it from the timer, and will be expired if needed etc.). But this `CommitOnCloseEvent` is only flipping a flag in the commit manager to not allow any more commits ([this](https://github.com/apache/kafka/blob/55d38efcc5505a5a1bddb08ba05f4d923f8050f9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L187)). I may be missing something, but this is not the kind of event that needs a deadline to complete or be reaped (aka. CompletableApplicationEvent) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1626147694 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. -commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); +commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: Hey @cadonna, your assumption is right: "the whole reconciliation is limited by the rebalance timeout", but the trick is that it's the broker who enforces that, not the client. That's why we don't see the `rebalanceTimeout` being used on the client side, other than for sending it to the broker in the initial HB, and here to limit the commit retries. We use it on the commit only because we needed to put a limit on the retries to make sure that if the commit keeps failing and being retried, the request is not kept forever on the client after the broker kicks the member out of the group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1625726216 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -319,11 +326,11 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState if (error == null) { result.complete(null); } else { -if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { -if (error instanceof TimeoutException && requestAttempt.isExpired()) { -log.debug("Auto-commit sync before revocation timed out and won't be retried anymore"); -result.completeExceptionally(error); -} else if (error instanceof UnknownTopicOrPartitionException) { +if (requestAttempt.isExpired()) { Review Comment: Follow-up to https://github.com/apache/kafka/pull/16031#discussion_r1624620726 I am not sure if this is good. Let's assume I get a non-retriable error AND the timeout was exceeded. Do I really want to get a timeout exception wrapped around the non-retriable error? I want a timeout exception when I can improve the situation by increasing the timeout. If I get a non-retriable error, I cannot improve the situation by increasing the timeout. So, I would only throw a timeout exception when I get a retriable error. WDYT? This affects also the other location where the expiration is verified. Either way, tests are required that test the combination of (non-)retriable error and expiration. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: I do not understand this comment. Could you please elaborate? If I look where the poll is done, a duration of 100 ms is used. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -465,39 +474,39 @@ private Throwable commitAsyncExceptionForError(Throwable error) { * Enqueue a request to fetch committed offsets, that will be sent on the next call to {@link #poll(long)}. * * @param partitions Partitions to fetch offsets for. - * @param expirationTimeMs Time until which the request should be retried if it fails + * @param deadlineMs Time until which the request should be retried if it fails Review Comment: nit: the indentation is not consistent ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -391,7 +398,7 @@ public CompletableFuture commitAsync(final Maphttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { Review Comment: Could you add unit tests for this class? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@l
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1625709036 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. -commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); +commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: My assumption was that the whole reconciliation is limited by the rebalance timeout. But here it seems that we only have a timeout on the auto commit sync. I would have expected that different parts of the reconciliation would update a timer and the commit sync would take the remaining time as the timeout. But maybe I missing something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2146294482 @lianetm @cadonna—this is ready for another review. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624997659 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -747,21 +748,19 @@ public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() { new TopicPartition("topic", 1), new OffsetAndMetadata(0)); -// Send sync offset commit request that fails with retriable error. -long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; -CompletableFuture commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); -completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); +// Send sync offset commit request. +long deadlineMs = time.milliseconds() + retryBackoffMs * 2; +CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs); -// Request retried after backoff, and fails with retriable again. Should not complete yet +// Make the first request fail with a retriable error. Should not complete yet // given that the request timeout hasn't expired. time.sleep(retryBackoffMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); assertFalse(commitResult.isDone()); // Sleep to expire the request timeout. Request should fail on the next poll. time.sleep(retryBackoffMs); -NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); -assertEquals(0, res.unsentRequests.size()); +completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); Review Comment: This code is reverted back to the original form now that we prune expired requests that have been attempted at least once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624996705 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest createUnsentRequest( private void handleError(final Throwable exception, final long completionTimeMs) { if (exception instanceof RetriableException) { -if (completionTimeMs >= expirationTimeMs) { +if (isExpired()) { Review Comment: I refactored the code so that `handleResponse()` calls `handleError()` if it hits an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624992826 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -61,30 +62,28 @@ */ public class TopicMetadataRequestManager implements RequestManager { +private final Time time; private final boolean allowAutoTopicCreation; private final List inflightRequests; +private final int requestTimeoutMs; private final long retryBackoffMs; private final long retryBackoffMaxMs; private final Logger log; private final LogContext logContext; -public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { +public TopicMetadataRequestManager(final LogContext context, final Time time, final ConsumerConfig config) { logContext = context; log = logContext.logger(getClass()); +this.time = time; inflightRequests = new LinkedList<>(); +requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); } @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { -// Prune any requests which have timed out -List expiredRequests = inflightRequests.stream() -.filter(req -> req.isExpired(currentTimeMs)) -.collect(Collectors.toList()); -expiredRequests.forEach(TopicMetadataRequestState::expire); - Review Comment: I've updated the logic in `CommitRequestManager` and `TopicMetadataRequestManager` to proactively prune expired `RequestState`s that have had at least one request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624957099 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: agree that it's confusing, but for the record, I guess the current in the name may come from the point of view that here's the moment a request completes, and we retrieve the completion time, so could I could see it as the current because of where it's called (but still +1 for better name, simply `completionTimeMs`, that btw aligns with the `handleClientResponse` func param where it's used right below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624788878 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: It _shouldn't_. My preference is to make data as immutable as possible. Setting the `Timer` on creation vs. after the fact seems cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624815072 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + +private final Timer timer; + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); +this.timer = timer; +} + +// Visible for testing Review Comment: I removed the false comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624812730 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest createUnsentRequest( private void handleError(final Throwable exception, final long completionTimeMs) { if (exception instanceof RetriableException) { -if (completionTimeMs >= expirationTimeMs) { +if (isExpired()) { Review Comment: I will take a look at refactoring this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624804702 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: I don't know 🤷♂️ Every so often I think about changing it, but I generally don't include changes that are unrelated to my PR. Would you like me to change it in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624803424 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + +private final Timer timer; + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); +this.timer = timer; +} + +// Visible for testing Review Comment: Good catch! You're right! Where that constructor is called, there is a comment that it's for testing only, so I applied that same comment in `TimedRequestState`. But the existing comment is a lie, so my code perpetuates it. 🤦♂️ I'll remove it from the new code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624801103 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. -commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); +commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: I might misunderstand your question—are you suggesting to pass in the _timeout_ vs. the _deadline_? There's no `Timer` in the `getDeadlineMsForTimeout`. It just calculates the deadline from the timeout and handles overflow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624792665 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -61,30 +62,28 @@ */ public class TopicMetadataRequestManager implements RequestManager { +private final Time time; private final boolean allowAutoTopicCreation; private final List inflightRequests; +private final int requestTimeoutMs; private final long retryBackoffMs; private final long retryBackoffMaxMs; private final Logger log; private final LogContext logContext; -public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { +public TopicMetadataRequestManager(final LogContext context, final Time time, final ConsumerConfig config) { logContext = context; log = logContext.logger(getClass()); +this.time = time; inflightRequests = new LinkedList<>(); +requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); } @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { -// Prune any requests which have timed out -List expiredRequests = inflightRequests.stream() -.filter(req -> req.isExpired(currentTimeMs)) -.collect(Collectors.toList()); -expiredRequests.forEach(TopicMetadataRequestState::expire); - Review Comment: It makes sense to have `poll()` expire requests that are _in flight_. I don't think `poll()` should expire _unsent_ requests, because we want to let them have a chance to run at least once. I'll change the logic to have `poll()` expire `Future`s related to requests that have been sent but that are now expired. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624788878 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: It _shouldn't_. My preference is to make data as unmutable as possible. Setting the `Timer` on creation vs. after the fact seems cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624620726 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, result.complete(null); } else { if (error instanceof RetriableException) { -if (error instanceof TimeoutException && requestAttempt.isExpired) { +if (error instanceof TimeoutException && requestAttempt.isExpired()) { Review Comment: If I remember right, that was needed at some point to avoid retrying on [this](https://github.com/apache/kafka/blob/a68a1cce824a8346509d5194e0e43a3cb36ba09a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L838) `TimeoutException` internally generated by the manager when expiring requests on poll (vs retrying on the request `TimeoutException` received as a response if the api level timeout was still not expired). But with this PR the manager does not do that anymore, so agree we should check isExpired and fail, no matter the exception. On top of that valid point from @cadonna, I think there's more to this. Before, the manager was internally throwing a TimeoutException, so at this point we could simply completeExceptionally with the same error (ln 441). But this also changes now. Similar to how the `TopicMetadataManager` does, if at this point we see that the request isExpired, I guess we need to throw a new TimeoutException (not the last known error, which is what is thrown now in ln 441) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624694081 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -747,21 +748,19 @@ public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() { new TopicPartition("topic", 1), new OffsetAndMetadata(0)); -// Send sync offset commit request that fails with retriable error. -long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; -CompletableFuture commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); -completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); +// Send sync offset commit request. +long deadlineMs = time.milliseconds() + retryBackoffMs * 2; +CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs); -// Request retried after backoff, and fails with retriable again. Should not complete yet +// Make the first request fail with a retriable error. Should not complete yet // given that the request timeout hasn't expired. time.sleep(retryBackoffMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); assertFalse(commitResult.isDone()); // Sleep to expire the request timeout. Request should fail on the next poll. time.sleep(retryBackoffMs); -NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); -assertEquals(0, res.unsentRequests.size()); +completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); Review Comment: uhm this is exactly my concern on [comment above](https://github.com/apache/kafka/pull/16031/files#r1624689065). Before this PR poll would throw timeout because it knows that the call to commitSync had a deadline that expired. Now it requires a response to the request saying that it timed out. Same outcome, but it needs to sit and wait for something it already knows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624689065 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -61,30 +62,28 @@ */ public class TopicMetadataRequestManager implements RequestManager { +private final Time time; private final boolean allowAutoTopicCreation; private final List inflightRequests; +private final int requestTimeoutMs; private final long retryBackoffMs; private final long retryBackoffMaxMs; private final Logger log; private final LogContext logContext; -public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { +public TopicMetadataRequestManager(final LogContext context, final Time time, final ConsumerConfig config) { logContext = context; log = logContext.logger(getClass()); +this.time = time; inflightRequests = new LinkedList<>(); +requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); } @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { -// Prune any requests which have timed out -List expiredRequests = inflightRequests.stream() -.filter(req -> req.isExpired(currentTimeMs)) -.collect(Collectors.toList()); -expiredRequests.forEach(TopicMetadataRequestState::expire); - Review Comment: I totally agree that we shouldn't prune expired request on poll before returning the requests (because it does not give requests with timeout 0 their single time to run that we wanted). But not pruning at all on poll goes a bit farther that what I was expecting. Seems conceptually not right that the manager will keep requests that it has sent out, and it knows are expired. It then relies on a timeout response to expire the request, and with this the outcome is the same, but timing is very different: the manager will wait for the full request_timeout before throwing a `TimeoutException` for a request that it knew it was expired since the moment it poll for it the first time right? (all that time it will keep the request on its queue as in-flight, skipping it on every poll just because it hasn't received a response). I expect that tests that throw Timeout on poll would fail with this, and they would need to wait for a network timeout response that was not needed before, so that the manager actually throws TimeoutException, that's what I'm referring to. Managers already apply expiration logic to requests: 1. on poll (before returning the requests) and on response (before this PR) 2. on response only (this PR) 3. on poll (after returning the requests) and on responses -> would this be a sensible compromise, so that we end up with managers that apply the expiration logic consistently wherever they can? It's closer to what we had before this PR, and also fixes the fire-and-forget behaviour we were missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624689065 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -61,30 +62,28 @@ */ public class TopicMetadataRequestManager implements RequestManager { +private final Time time; private final boolean allowAutoTopicCreation; private final List inflightRequests; +private final int requestTimeoutMs; private final long retryBackoffMs; private final long retryBackoffMaxMs; private final Logger log; private final LogContext logContext; -public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { +public TopicMetadataRequestManager(final LogContext context, final Time time, final ConsumerConfig config) { logContext = context; log = logContext.logger(getClass()); +this.time = time; inflightRequests = new LinkedList<>(); +requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); } @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { -// Prune any requests which have timed out -List expiredRequests = inflightRequests.stream() -.filter(req -> req.isExpired(currentTimeMs)) -.collect(Collectors.toList()); -expiredRequests.forEach(TopicMetadataRequestState::expire); - Review Comment: I totally agree that we shouldn't prune expired request on poll before returning the requests (because it does not give requests with timeout 0 their single time to run that we wanted). But not pruning at all on poll goes a bit farther than what I was expecting. Seems conceptually not right that the manager will keep requests that it has sent out, and it knows are expired. It then relies on a timeout response to expire the request, and with this the outcome is the same, but timing is very different: the manager will wait for the full request_timeout before throwing a `TimeoutException` for a request that it knew it was expired since the moment it poll for it the first time right? (all that time it will keep the request on its queue as in-flight, skipping it on every poll just because it hasn't received a response). I expect that tests that throw Timeout on poll would fail with this, and they would need to wait for a network timeout response that was not needed before, so that the manager actually throws TimeoutException, that's what I'm referring to. Managers already apply expiration logic to requests: 1. on poll (before returning the requests) and on response (before this PR) 2. on response only (this PR) 3. on poll (after returning the requests) and on responses -> would this be a sensible compromise, so that we end up with managers that apply the expiration logic consistently wherever they can? It's closer to what we had before this PR, and also fixes the fire-and-forget behaviour we were missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624620726 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, result.complete(null); } else { if (error instanceof RetriableException) { -if (error instanceof TimeoutException && requestAttempt.isExpired) { +if (error instanceof TimeoutException && requestAttempt.isExpired()) { Review Comment: If I remember right, that was needed at some point to avoid retrying on [this](https://github.com/apache/kafka/blob/a68a1cce824a8346509d5194e0e43a3cb36ba09a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L838) `TimeoutException` internally generated by the manager when expiring requests on poll (vs retrying on the request `TimeoutException` received as a response if the api level timeout was still not expired). But with this PR the manager does not do that anymore, so agree we should check isExpired and fail, no matter the exception. Note that there more to this. Before, the manager was internally throwing a TimeoutException, so at this point we could simply completeExceptionally with the same error (ln 441). But this also changes now. Similar to how the `TopicMetadataManager` does, if at this point we see that the request isExpired, I guess we need to throw a new TimeoutException (not the last known error, which is what is thrown now in ln 441) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624620726 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, result.complete(null); } else { if (error instanceof RetriableException) { -if (error instanceof TimeoutException && requestAttempt.isExpired) { +if (error instanceof TimeoutException && requestAttempt.isExpired()) { Review Comment: If I remember right, that was needed at some point to avoid retrying on [this](https://github.com/apache/kafka/blob/a68a1cce824a8346509d5194e0e43a3cb36ba09a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L838) `TimeoutException` internally generated by the manager when expiring requests on poll (vs retrying on the request `TimeoutException` received as a response if the api level timeout was still not expired). But with this PR the manager does not do that anymore, so agree we should check isExpired and fail, no matter the exception. Note that there more to this. Before, the manager internally throw a TimeoutException, so at this point we could simply completeExceptionally with the error (ln 441). But this also changes now. Similar to how the `TopicMetadataManager` does, if at this point we see that the request isExpired, I guess we need to throw a new TimeoutException (not the last known error, which is what is thrown now in ln 441) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624549518 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -210,7 +194,7 @@ private NetworkClientDelegate.UnsentRequest createUnsentRequest( private void handleError(final Throwable exception, final long completionTimeMs) { if (exception instanceof RetriableException) { -if (completionTimeMs >= expirationTimeMs) { +if (isExpired()) { Review Comment: this logic of what we want to do on retriable errors seems to be the same for all retriable errors (no matter if found in `handleError` or `handleResponse`). Could we maybe consolidate it in a single place `maybeExpireOnRetriable` or something like it, that would be consistently applied whenever we need to maybe expire? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624238381 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture future(); -/** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ -void maybeExpire(long currentTimeMs) { -if (retryTimeoutExpired(currentTimeMs)) { -removeRequest(); -isExpired = true; -future().completeExceptionally(new TimeoutException(requestDescription() + -" could not complete before timeout expired.")); -} -} - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator()); +coordinatorRequestManager.coordinator(), +time.timer(requestTimeoutMs) +); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: Not directly related to this PR: Why is this variable called `currentTimeMs`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + +private final Timer timer; + +public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { +super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); +this.timer = timer; +} + +// Visible for testing Review Comment: This does not seem to be true. The constructor is not called in tests and also the constructors that call this constructor are not called in tests. That is one reason I do not like this kind of comments, because they easily start to lie. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: I guess it does not make a big difference if the timer is set when the unsent request is created vs. when the unsent request is added to the network client, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, result.complete(null); } else { if (error instanceof RetriableException) { -if (error instanceof TimeoutException && requestAttempt.isExpired) { +if (error instanceof TimeoutException && requestAttempt.isExpired()) { Review Comment: Why do we only check expiration in case of a `TimeoutException` but not for other retriable exceptions? The `TopicMetadataRequestManager` handles this differently. ## c
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout URL: https://github.com/apache/kafka/pull/16031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558404 I'm going to close and reopen to force another build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout URL: https://github.com/apache/kafka/pull/16031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558063 @lianetm—relevant test failures have been addressed. There are three unrelated test failures from flaky tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
lianetm commented on PR #16031: URL: https://github.com/apache/kafka/pull/16031#issuecomment-2137670350 Hey @kirktrue, thanks for the changes, I'll take a look, but I notice several consumer integration tests are failing here for commits, are we still missing changes/fixes maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1618010550 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ## @@ -133,43 +131,6 @@ public void testAllTopicsExceptionAndInflightRequests(final Errors error, final } } -@Test -public void testExpiringRequest() { -String topic = "hello"; - -// Request topic metadata with 1000ms expiration -long now = this.time.milliseconds(); -CompletableFuture>> future = -this.topicMetadataRequestManager.requestTopicMetadata(topic, now + 1000L); -assertEquals(1, this.topicMetadataRequestManager.inflightRequests().size()); - -// Poll the request manager to get the list of requests to send -// - fail the request with a RetriableException -NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); -assertEquals(1, res.unsentRequests.size()); - res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse( -res.unsentRequests.get(0), -topic, -Errors.REQUEST_TIMED_OUT)); - -// Sleep for long enough to exceed the backoff delay but still within the expiration -// - fail the request again with a RetriableException -this.time.sleep(500); -res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); -assertEquals(1, res.unsentRequests.size()); - res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse( -res.unsentRequests.get(0), -topic, -Errors.REQUEST_TIMED_OUT)); - -// Sleep for long enough to expire the request which should fail -this.time.sleep(1000); -res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); -assertEquals(0, res.unsentRequests.size()); -assertEquals(0, this.topicMetadataRequestManager.inflightRequests().size()); -assertTrue(future.isCompletedExceptionally()); -} - Review Comment: This test exercises the preemptive pruning, which we no longer use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1618010264 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java: ## @@ -123,23 +122,6 @@ public void testEnsureCorrectCompletionTimeOnComplete() { assertEquals(timeMs, unsentRequest.handler().completionTimeMs()); } -@Test -public void testEnsureTimerSetOnAdd() { -NetworkClientDelegate ncd = newNetworkClientDelegate(); -NetworkClientDelegate.UnsentRequest findCoordRequest = newUnsentFindCoordinatorRequest(); -assertNull(findCoordRequest.timer()); - -// NetworkClientDelegate#add -ncd.add(findCoordRequest); -assertEquals(1, ncd.unsentRequests().size()); -assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs()); - -// NetworkClientDelegate#addAll -ncd.addAll(Collections.singletonList(findCoordRequest)); -assertEquals(1, ncd.unsentRequests().size()); -assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs()); -} - Review Comment: The `Timer` is provided via the constructor, so this test is no longer needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1618009815 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -779,16 +778,17 @@ public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExp new TopicPartition("topic", 1), new OffsetAndMetadata(0)); -// Send offset commit request that fails with retriable error. -long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; -CompletableFuture commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); +// Send offset commit request. +long deadlineMs = time.milliseconds() + retryBackoffMs * 2; +CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs); + +// Make the first request fail due to a missing coordinator. Review Comment: Same here as above, update this code now that we don't preemptive prune expired requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1618009570 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -747,21 +748,19 @@ public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() { new TopicPartition("topic", 1), new OffsetAndMetadata(0)); -// Send sync offset commit request that fails with retriable error. -long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; -CompletableFuture commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); -completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); +// Send sync offset commit request. +long deadlineMs = time.milliseconds() + retryBackoffMs * 2; +CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs); Review Comment: This logic needed to be revised as the previous code relied on the preemptive pruning of expired requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1618009040 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -61,30 +62,28 @@ */ public class TopicMetadataRequestManager implements RequestManager { +private final Time time; private final boolean allowAutoTopicCreation; private final List inflightRequests; +private final int requestTimeoutMs; private final long retryBackoffMs; private final long retryBackoffMaxMs; private final Logger log; private final LogContext logContext; -public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { +public TopicMetadataRequestManager(final LogContext context, final Time time, final ConsumerConfig config) { logContext = context; log = logContext.logger(getClass()); +this.time = time; inflightRequests = new LinkedList<>(); +requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); } @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { -// Prune any requests which have timed out -List expiredRequests = inflightRequests.stream() -.filter(req -> req.isExpired(currentTimeMs)) -.collect(Collectors.toList()); -expiredRequests.forEach(TopicMetadataRequestState::expire); - Review Comment: As with `CommitRequestManager`, we don't proactively prune requests. Instead, we wait for the `NetworkClient` to invoke our callback with a `TimeoutException` so there's a single path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1618007950 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -235,7 +235,6 @@ public void addAll(final List requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); -r.setTimer(this.time, this.requestTimeoutMs); Review Comment: We provide the `Timer` when creating the `UnsentRequest`, so we don't need to add it afterward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org