Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
lucasbru merged PR #14842: URL: https://github.com/apache/kafka/pull/14842 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
lucasbru commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1831602145 Test failures unrelated and mapped to existing/new flaky test issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
cadonna commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1831462346 I totally agree on not using spies. That is my opinion and also the Mockito documentation says to be really careful with spies and to just use them if absolutely needed. Spies are mostly used to test legacy code and they should not be used for new code. We should definitely not wrap the code to test into spies. In my current PR, a unit test was not working because of this wrapping. I started to improve the test code in a separate PR, but the PRs for the deadline have higher priority. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1830970851 Here are the failures: ``` Build / JDK 21 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 42s Build / JDK 21 and Scala 2.13 / testResetSinkConnectorOffsetsZombieSinkTasks – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest 58s Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest 6s Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=kraft – integration.kafka.server.FetchFromFollowerIntegrationTest 6s Build / JDK 21 and Scala 2.13 / testWithGroupMetadata() – kafka.api.TransactionsBounceTest 13s Build / JDK 21 and Scala 2.13 / testWithGroupMetadata() – kafka.api.TransactionsBounceTest 14s Build / JDK 21 and Scala 2.13 / testFailureToFenceEpoch(String).quorum=kraft – kafka.api.TransactionsTest 42s Build / JDK 17 and Scala 2.13 / testWithGroupId() – kafka.api.TransactionsBounceTest 12s Build / JDK 17 and Scala 2.13 / testWithGroupId() – kafka.api.TransactionsBounceTest 9s Build / JDK 17 and Scala 2.13 / testWrongIncarnationId() – kafka.server.ControllerRegistrationManagerTest 30s Build / JDK 17 and Scala 2.13 / testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest <1s Build / JDK 17 and Scala 2.13 / testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).quorum=zk – org.apache.kafka.tools.TopicCommandIntegrationTest 3s Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 42s Build / JDK 8 and Scala 2.12 / testAlterSinkConnectorOffsetsZombieSinkTasks – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest 24s Build / JDK 8 and Scala 2.12 / testWithGroupMetadata() – kafka.api.TransactionsBounceTest 12s Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 15s Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 2m 51s Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest 2m 28s Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 4m 26s Build / JDK 11 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 16s ``` They are unrelated but seems like there's a movement of not merging failing builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1408238807 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -234,11 +235,20 @@ public void testAutocommit_ResendAutocommitAfterException() { @Test public void testAutocommit_EnsureOnlyOneInflightRequest() { +TopicPartition t1p = new TopicPartition("topic1", 0); +subscriptionState.assignFromUser(singleton(t1p)); +//subscriptionState.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L)); Review Comment: oh yes... 🤦 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -234,11 +235,20 @@ public void testAutocommit_ResendAutocommitAfterException() { @Test public void testAutocommit_EnsureOnlyOneInflightRequest() { +TopicPartition t1p = new TopicPartition("topic1", 0); +subscriptionState.assignFromUser(singleton(t1p)); +//subscriptionState.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L)); Review Comment: oh yes... 🤦 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1408236793 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -269,60 +275,56 @@ void cleanup() { * completed in time. */ // Visible for testing -void maybeAutoCommitAndLeaveGroup(final Timer timer) { +void maybeAutocommitOnClose(final Timer timer) { if (!requestManagers.coordinatorRequestManager.isPresent()) return; +if (!requestManagers.commitRequestManager.isPresent()) { +log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); +return; +} + +if (!requestManagers.commitRequestManager.get().canAutoCommit()) { +return; +} + ensureCoordinatorReady(timer); -List tasks = closingRequests(); -networkClientDelegate.addAll(tasks); +List autocommitRequest = Review Comment: Cannot agree more! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1408236102 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -210,17 +210,19 @@ public CompletableFuture maybeAutoCommitAllConsumed() { return maybeAutoCommit(subscriptions.allConsumed()); } +boolean canAutoCommit() { +return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); +} + /** - * The consumer needs to send an auto commit during the shutdown if autocommit is enabled. + * Returns an OffsetCommitRequest of all assigned topicPartitions and their current positions. */ -Optional maybeCreateAutoCommitRequest() { -if (!autoCommitState.isPresent()) { -return Optional.empty(); -} - -OffsetCommitRequestState request = pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter); +NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() { Review Comment: good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1408235006 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -184,7 +184,7 @@ private static long findMinTime(final Collection request * completed future if no request is generated. */ public CompletableFuture maybeAutoCommit(final Map offsets) { -if (!autoCommitState.isPresent()) { +if (!canAutoCommit()) { Review Comment: In fact, let me change the name of ``` public boolean canSendAutocommit() { return !this.hasInflightCommit && this.timer.isExpired(); } ``` to `shouldAutoCommit, because when the timer has expired, we should send the autocommit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
lucasbru commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1829919586 > Hi @lucasbru - I just opened the PR for you to review. I'm not 100% happy with the way tests are setup therefore I made some changes around optionally disabling autocommit in the network thread. Also, I feel the tests here kind of become some sort of integration testing. I thought it kind of against the unit test philosophy. Yeah. I think the tests may be using `spy` too much, and should use `mock` instead. It's definitely running too much code, which makes it more like an integration test, but still introspect a lot, which make them more like a unit test. But it's not something that we are going to fix in this PR. I wonder if we can maybe create a ticket to migrate the `AsyncKafkaConsumerTest` to a more mocking based approach, is that something that would be feasible in a reasonable amount of time? cc @cadonna who always has strong opinions on unit testing. > 1. We will only auto commit if the configuration is enabled (by default) or if we've got anything to commit at all Makes sense > 2. We need to enforce finding a coordinator and send an autocommit regardless of the previous commit state because we need to make sure to record the progress before closing Makes sense > 3. Quite a bit of changes to the testing, because autocommit depends on the current progress, so I need to "seek" for some cases to ensure the test sends an autocommit Makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
lucasbru commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1407524109 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -269,60 +275,56 @@ void cleanup() { * completed in time. */ // Visible for testing -void maybeAutoCommitAndLeaveGroup(final Timer timer) { +void maybeAutocommitOnClose(final Timer timer) { if (!requestManagers.coordinatorRequestManager.isPresent()) return; +if (!requestManagers.commitRequestManager.isPresent()) { +log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); +return; +} + +if (!requestManagers.commitRequestManager.get().canAutoCommit()) { +return; +} + ensureCoordinatorReady(timer); -List tasks = closingRequests(); -networkClientDelegate.addAll(tasks); +List autocommitRequest = Review Comment: Let's not use a list here, then we can drop the `get(0)` below. You can create the list in place in the line below or add a `NetworkClientDelegate.add` method. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -796,5 +823,45 @@ private HashMap mockTimestampToSearch() { timestampToSearch.put(t1, 2L); return timestampToSearch; } + +private void prepAutocommitOnClose() { +Node node = testBuilder.metadata.fetch().nodes().get(0); + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); +if (!testBuilder.subscriptions.allConsumed().isEmpty()) { +List topicPartitions = new ArrayList<>(testBuilder.subscriptions.assignedPartitionsList()); +testBuilder.client.prepareResponse(mockAutocommitResponse( +topicPartitions, +(short) 1, +Errors.NONE).responseBody()); +} +} +private ClientResponse mockAutocommitResponse(final List topicPartitions, Review Comment: nit: newline in between the methods ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -184,7 +184,7 @@ private static long findMinTime(final Collection request * completed future if no request is generated. */ public CompletableFuture maybeAutoCommit(final Map offsets) { -if (!autoCommitState.isPresent()) { +if (!canAutoCommit()) { Review Comment: maybe `shouldAutoCommit`? Because it's more about being obliged to commit and not about being able to commit? Also, try to be consistent with `AutoCommit` vs. `Autocommit`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -210,17 +210,19 @@ public CompletableFuture maybeAutoCommitAllConsumed() { return maybeAutoCommit(subscriptions.allConsumed()); } +boolean canAutoCommit() { +return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); +} + /** - * The consumer needs to send an auto commit during the shutdown if autocommit is enabled. + * Returns an OffsetCommitRequest of all assigned topicPartitions and their current positions. */ -Optional maybeCreateAutoCommitRequest() { -if (!autoCommitState.isPresent()) { -return Optional.empty(); -} - -OffsetCommitRequestState request = pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter); +NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() { Review Comment: Again, maybe make clear that this only creates the request. also, in other methods you use `AllConsumed` and drop the `Positions`, so maybe `createCommitAllConsumedRequest` ? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -234,11 +235,20 @@ public void testAutocommit_ResendAutocommitAfterException() { @Test public void testAutocommit_EnsureOnlyOneInflightRequest() { +TopicPartition t1p = new TopicPartition("topic1", 0); +subscriptionState.assignFromUser(singleton(t1p)); +//subscriptionState.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L)); Review Comment: nit: maybe remove the commented line if we do not need it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198957 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -269,60 +275,56 @@ void cleanup() { * completed in time. */ // Visible for testing -void maybeAutoCommitAndLeaveGroup(final Timer timer) { +void maybeAutocommitOnClose(final Timer timer) { if (!requestManagers.coordinatorRequestManager.isPresent()) return; +if (!requestManagers.commitRequestManager.isPresent()) { +log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); +return; +} + +if (!requestManagers.commitRequestManager.get().canAutoCommit()) { +return; +} + ensureCoordinatorReady(timer); -List tasks = closingRequests(); -networkClientDelegate.addAll(tasks); +List autocommitRequest = + Collections.singletonList(requestManagers.commitRequestManager.get().commitAllConsumedPositions()); +networkClientDelegate.addAll(autocommitRequest); do { long currentTimeMs = timer.currentTimeMs(); ensureCoordinatorReady(timer); networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); -} while (timer.notExpired() && !tasks.stream().allMatch(v -> v.future().isDone())); +} while (timer.notExpired() && !autocommitRequest.get(0).future().isDone()); +} + +void maybeLeaveGroup(final Timer timer) { +// TODO: Leave group upon closing the consumer } private void ensureCoordinatorReady(final Timer timer) { -while (!coordinatorReady()) { +while (!coordinatorReady() && timer.notExpired()) { findCoordinatorSync(timer); } } private boolean coordinatorReady() { -CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.get(); +CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( +() -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); Review Comment: Probably unnecessary - but adding this for a safety check wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198120 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -210,17 +210,19 @@ public CompletableFuture maybeAutoCommitAllConsumed() { return maybeAutoCommit(subscriptions.allConsumed()); } +boolean canAutoCommit() { Review Comment: Unhappy with the naming, but can't seem to find a better way to restructure this at the moment. Didn't like it because there is an infinite variation of canXxxx(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent()) return EMPTY; -maybeAutoCommitAllConsumed(); +maybeAutoCommit(); Review Comment: we always commit allConsumed(), so there's no point to reinstate that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1828686697 Hi @lucasbru - I just opened the PR for you to review. I'm not 100% happy with the way tests are setup therefore I made some changes around optionally disabling autocommit in the network thread. Also, I feel the tests here kind of become some sort of integration testing. I thought it kind of against the unit test philosophy. In summary, the changes are: 1. We will only auto commit if the configuration is enabled (by default) or if we've got anything to commit at all 2. We need to enforce finding a coordinator and send an autocommit regardless of the previous commit state because we need to make sure to record the progress before closing 3. Quite a bit of changes to the testing, because autocommit depends on the current progress, so I need to "seek" for some cases to ensure the test sends an autocommit LMK what do you think! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406792003 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java: ## @@ -279,7 +287,7 @@ public ConsumerNetworkThreadTestBuilder(Optional groupInfo) { @Override public void close() { -closeQuietly(consumerNetworkThread, ConsumerNetworkThread.class.getSimpleName()); +consumerNetworkThread.close(); Review Comment: I don't think we should suppress the failures on closing during testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406789593 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -206,21 +206,21 @@ public CompletableFuture maybeAutoCommit(final Map maybeAutoCommitAllConsumed() { +public CompletableFuture maybeAutoCommit() { return maybeAutoCommit(subscriptions.allConsumed()); } +boolean canAutoCommit() { +return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); +} + /** - * The consumer needs to send an auto commit during the shutdown if autocommit is enabled. + * Return an OffsetCommitRequest of all assigned topicPartitions and their current positions. */ -Optional maybeCreateAutoCommitRequest() { -if (!autoCommitState.isPresent()) { -return Optional.empty(); -} - +NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() { OffsetCommitRequestState request = pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter); request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed())); -return Optional.of(request.toUnsentRequest()); +return request.toUnsentRequest(); Review Comment: We should always return a request because I moved that check out of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -184,7 +184,7 @@ private static long findMinTime(final Collection request * completed future if no request is generated. */ public CompletableFuture maybeAutoCommit(final Map offsets) { -if (!autoCommitState.isPresent()) { +if (!canAutoCommit()) { Review Comment: pretty terrible naming because it kind of overlaps with the one below, not sure there's a better description for it. It needs to check 1. if autocommit is enabled and 2. if there's anything to commit. If neither, then we don't try to send a commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent()) return EMPTY; -maybeAutoCommitAllConsumed(); +maybeAutoCommit(); Review Comment: we always commit allConsumed(), so there's no point to reinstate that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -184,7 +184,7 @@ private static long findMinTime(final Collection request * completed future if no request is generated. */ public CompletableFuture maybeAutoCommit(final Map offsets) { -if (!autoCommitState.isPresent()) { +if (!canAutoCommit()) { Review Comment: pretty terrible words, not sure there's a better description for it. It needs to check 1. if autocommit is enabled and 2. if there's anything to commit. If neither, then we don't try to send a commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406699542 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -269,22 +270,40 @@ void cleanup() { * completed in time. */ // Visible for testing -void maybeAutoCommitAndLeaveGroup(final Timer timer) { +void maybeAutocommitOnClose(final Timer timer) { if (!requestManagers.coordinatorRequestManager.isPresent()) return; +if (!requestManagers.commitRequestManager.isPresent()) { +log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); +return; +} + +if (!requestManagers.commitRequestManager.get().autoCommitEnabled()) { +return; +} + ensureCoordinatorReady(timer); -List tasks = closingRequests(); -networkClientDelegate.addAll(tasks); +Optional autocommit = requestManagers.commitRequestManager.get().maybeCreateAutoCommitRequest(); +if (!autocommit.isPresent()) { +return; +} + +List autocommitRequest = Collections.singletonList(autocommit.get()); +networkClientDelegate.addAll(autocommitRequest); do { long currentTimeMs = timer.currentTimeMs(); ensureCoordinatorReady(timer); networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); -} while (timer.notExpired() && !tasks.stream().allMatch(v -> v.future().isDone())); +} while (timer.notExpired() && !autocommitRequest.get(0).future().isDone()); +} + +void maybeLeaveGroup(final Timer timer) { +// TODO: Leave group upon closing the consumer Review Comment: Can I follow up with a subsequent ticket to address this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
lucasbru commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1827393124 @philipnee Sounds good. Let me know when it's ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1827329339 Hi @lucasbru - This PR should have addressed the issue in KAFKA-15887. I also fixed a couple of issues while reviewing the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1405778947 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -218,6 +218,10 @@ Optional maybeCreateAutoCommitRequest() { return Optional.empty(); } +if (subscriptions.allConsumed().isEmpty()) { Review Comment: For async request, we don't try to create one if there's no progress being made -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1405771339 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -254,8 +255,9 @@ private void closeInternal(final Duration timeout) { void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); -maybeAutoCommitAndLeaveGroup(timer); +maybeAutocommitOnClose(timer); Review Comment: After reviewing the ConsumerCoordinator, I split the original method into two functions: We should try to send an autocommit first and leave group last. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee opened a new pull request, #14842: URL: https://github.com/apache/kafka/pull/14842 A few bugs was created from the previous issues. These are: 1. During testing or some edge cases, the coordinator request manager might hold on to an inflight request forever. Therefore, when invoking coordinatorRequestManager.poll(), nothing would return. Here we explicitly create a FindCoordinatorRequest regardless of the current request state because we want to actively search for a coordinator 2. ensureCoordinatorReady() might be stuck in an infinite loop forever if the client fail to do so. Even the consumer would be able to shutdown eventually, this is undesirable. 3. The current asyncConsumerTest mixes background/network thread shutdown with the consumer shutdown. As the goal of the module is unit testing, we should try to test the shutdown procedure separately. Therefore, this PR adds a Mockito.doAnswer call to the applicationEventHandler.close(). Tests that are testing shutdown are calling shutdown() explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org