[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889350182 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. -if (coordinatorUnknownAndUnready(timer)) { -return false; +if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889351949 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); -// should find an available node in next find coordinator request +// should not try to find coordinator since we are in manual assignment +// hence the prepared response should not be returned client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); +assertTrue(coordinator.coordinatorUnknown()); +} + +@Test +public void testAutoCommitAsyncWithUserAssignedType() { +try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions) +) { Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889361765 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); -// should find an available node in next find coordinator request +// should not try to find coordinator since we are in manual assignment +// hence the prepared response should not be returned client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); +assertTrue(coordinator.coordinatorUnknown()); +} + +@Test +public void testAutoCommitAsyncWithUserAssignedType() { +try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions) +) { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error +client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); +// set timeout to 0 because we don't want to retry after the error +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); + +// elapse auto commit interval and set committable position +time.sleep(autoCommitIntervalMs); +subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L)); + +// should try to find coordinator since we are auto committing +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.poll(time.timer(Long.MAX_VALUE)); +assertFalse(coordinator.coordinatorUnknown()); +} +} + +@Test +public void testCommitAsyncWithUserAssignedType() { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error +client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); +// set timeout to 0 because we don't want to retry after the error +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); Review Comment: Ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889363536 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -548,14 +549,18 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } } } else { -// For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. +// For manually assigned partitions, we do not try to pro-actively lookup coordinator; +// instead we only try to refresh metadata when necessary. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. -if (coordinatorUnknownAndUnready(timer)) { -return false; +if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { +client.awaitMetadataUpdate(timer); } + +// if there is pending coordinator requests, ensure they have a chance to be transmitted. Review Comment: This is a major change while addressing @dajac 's comment: previously the manual assignment, the `coordinator.poll` call would not call `networkClient.poll`, which means that if the coordinator discovery does not complete within the `commitAsync` (note we call `networkClient.poll` twice in that function, so it's possible that function would complete the discovery), we would have no other places to poll the network client to complete the pending requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r890401527 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,10 +514,62 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); -// should find an available node in next find coordinator request +// should not try to find coordinator since we are in manual assignment +// hence the prepared response should not be returned client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); +assertTrue(coordinator.coordinatorUnknown()); +} + +@Test +public void testAutoCommitAsyncWithUserAssignedType() { +try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// set timeout to 0 because we expect no requests sent +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); +assertFalse(client.hasInFlightRequests()); + +// elapse auto commit interval and set committable position +time.sleep(autoCommitIntervalMs); +subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L)); + +// should try to find coordinator since we are auto committing +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); +assertTrue(client.hasInFlightRequests()); + +client.respond(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.poll(time.timer(0)); +assertFalse(coordinator.coordinatorUnknown()); +// after we've discovered the coordinator we should send +// out the commit request immediately +assertTrue(client.hasInFlightRequests()); +} +} + +@Test +public void testCommitAsyncWithUserAssignedType() { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// set timeout to 0 because we expect no requests sent +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); +assertFalse(client.hasInFlightRequests()); + +// should try to find coordinator since we are commit async +coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> { +throw new AssertionError("Commit should not get responses"); Review Comment: ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org