[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889350182


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
 // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-if (coordinatorUnknownAndUnready(timer)) {
-return false;
+if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889351949


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,9 +514,50 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());
 
-// should find an available node in next find coordinator request
+// should not try to find coordinator since we are in manual assignment
+// hence the prepared response should not be returned
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.poll(time.timer(Long.MAX_VALUE));
+assertTrue(coordinator.coordinatorUnknown());
+}
+
+@Test
+public void testAutoCommitAsyncWithUserAssignedType() {
+try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889361765


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,9 +514,50 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());
 
-// should find an available node in next find coordinator request
+// should not try to find coordinator since we are in manual assignment
+// hence the prepared response should not be returned
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.poll(time.timer(Long.MAX_VALUE));
+assertTrue(coordinator.coordinatorUnknown());
+}
+
+@Test
+public void testAutoCommitAsyncWithUserAssignedType() {
+try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+) {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE 
error
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.COORDINATOR_NOT_AVAILABLE));
+// set timeout to 0 because we don't want to retry after the error
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());
+
+// elapse auto commit interval and set committable position
+time.sleep(autoCommitIntervalMs);
+subscriptions.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));
+
+// should try to find coordinator since we are auto committing
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+coordinator.poll(time.timer(Long.MAX_VALUE));
+assertFalse(coordinator.coordinatorUnknown());
+}
+}
+
+@Test
+public void testCommitAsyncWithUserAssignedType() {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE 
error
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.COORDINATOR_NOT_AVAILABLE));
+// set timeout to 0 because we don't want to retry after the error
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889363536


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -548,14 +549,18 @@ public boolean poll(Timer timer, boolean 
waitForJoinGroup) {
 }
 }
 } else {
-// For manually assigned partitions, if coordinator is unknown, 
make sure we lookup one and await metadata.
+// For manually assigned partitions, we do not try to pro-actively 
lookup coordinator;
+// instead we only try to refresh metadata when necessary.
 // If connections to all nodes fail, wakeups triggered while 
attempting to send fetch
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
 // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-if (coordinatorUnknownAndUnready(timer)) {
-return false;
+if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
+client.awaitMetadataUpdate(timer);
 }
+
+// if there is pending coordinator requests, ensure they have a 
chance to be transmitted.

Review Comment:
   This is a major change while addressing @dajac 's comment: previously the 
manual assignment, the `coordinator.poll` call would not call 
`networkClient.poll`, which means that if the coordinator discovery does not 
complete within the `commitAsync` (note we call `networkClient.poll` twice in 
that function, so it's possible that function would complete the discovery), we 
would have no other places to poll the network client to complete the pending 
requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-06 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r890401527


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,10 +514,62 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());
 
-// should find an available node in next find coordinator request
+// should not try to find coordinator since we are in manual assignment
+// hence the prepared response should not be returned
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.poll(time.timer(Long.MAX_VALUE));
+assertTrue(coordinator.coordinatorUnknown());
+}
+
+@Test
+public void testAutoCommitAsyncWithUserAssignedType() {
+try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// set timeout to 0 because we expect no requests sent
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());
+assertFalse(client.hasInFlightRequests());
+
+// elapse auto commit interval and set committable position
+time.sleep(autoCommitIntervalMs);
+subscriptions.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));
+
+// should try to find coordinator since we are auto committing
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());
+assertTrue(client.hasInFlightRequests());
+
+client.respond(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.poll(time.timer(0));
+assertFalse(coordinator.coordinatorUnknown());
+// after we've discovered the coordinator we should send
+// out the commit request immediately
+assertTrue(client.hasInFlightRequests());
+}
+}
+
+@Test
+public void testCommitAsyncWithUserAssignedType() {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// set timeout to 0 because we expect no requests sent
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());
+assertFalse(client.hasInFlightRequests());
+
+// should try to find coordinator since we are commit async
+coordinator.commitOffsetsAsync(singletonMap(t1p, new 
OffsetAndMetadata(100L)), (offsets, exception) -> {
+throw new AssertionError("Commit should not get responses");

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org