Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
lucasbru merged PR #15035: URL: https://github.com/apache/kafka/pull/15035 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on PR #15035: URL: https://github.com/apache/kafka/pull/15035#issuecomment-1867003094 @dajac - thanks for merging the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on PR #15035: URL: https://github.com/apache/kafka/pull/15035#issuecomment-1865922174 Don't worry! We will take care of restarting the builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on PR #15035: URL: https://github.com/apache/kafka/pull/15035#issuecomment-1865440196 hmm @cadonna - seems like there are some build stability issue. my other PR also failed after 8hr (it is completely unrelated to the consumer refactor) `PR-15023` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432925646 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -525,16 +526,37 @@ public void testHeartbeatState() { @Test public void testPollTimerExpiration() { -heartbeatRequestManager = createHeartbeatRequestManager(); +coordinatorRequestManager = mock(CoordinatorRequestManager.class); +membershipManager = mock(MembershipManager.class); +heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); +heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( Review Comment: it is there to verify the reset call and we also need a real object to ensure the manager can send a request. If we mock it, then we have to mock the canSendRequest call, then it is pointless to verify the request returned from poll because it will always return one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432918143 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -525,16 +526,37 @@ public void testHeartbeatState() { @Test public void testPollTimerExpiration() { -heartbeatRequestManager = createHeartbeatRequestManager(); +coordinatorRequestManager = mock(CoordinatorRequestManager.class); +membershipManager = mock(MembershipManager.class); +heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); +heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( Review Comment: If it is too much work to change, let's change it later and merge it as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432918143 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -525,16 +526,37 @@ public void testHeartbeatState() { @Test public void testPollTimerExpiration() { -heartbeatRequestManager = createHeartbeatRequestManager(); +coordinatorRequestManager = mock(CoordinatorRequestManager.class); +membershipManager = mock(MembershipManager.class); +heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); +heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( Review Comment: If it is too much work to change, let's change it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432897655 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -525,16 +526,37 @@ public void testHeartbeatState() { @Test public void testPollTimerExpiration() { -heartbeatRequestManager = createHeartbeatRequestManager(); +coordinatorRequestManager = mock(CoordinatorRequestManager.class); +membershipManager = mock(MembershipManager.class); +heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); +heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( Review Comment: Why is there still this spy? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); +// TODO: Integrate partition revocation/loss callback +// Clear the current assignment and subscribed partitions because the member has left the group +updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true); Review Comment: Is it correct that we have this call now in two places? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on PR #15035: URL: https://github.com/apache/kafka/pull/15035#issuecomment-1864595438 @cadonna @dajac @AndrewJSchofield - Thanks a lot for the reviews. I've made changes accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432803468 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -679,4 +683,23 @@ private HeartbeatRequestManager createHeartbeatRequestManager() { heartbeatRequestState, backgroundEventHandler); } + +private HeartbeatRequestManager createHeartbeatRequestManager( Review Comment: turned out we are using testBuilder in this test again. So we might want to refactor the test and turn everything to mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432800050 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -191,10 +191,11 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); // This should trigger a heartbeat with leave group epoch -membershipManager.transitionToStaled(); +membershipManager.transitionToStale(); Review Comment: transitionTo**Stale** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432799267 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); +// TODO: Integrate partition revocation/loss callback +// Clear the current assignment and subscribed partitions because the member has left the group +updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true); Review Comment: Thanks, I originally thought we wanted to abandon the partitions after sending the leave group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
dajac commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432786211 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); +// TODO: Integrate partition revocation/loss callback +// Clear the current assignment and subscribed partitions because the member has left the group +updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true); Review Comment: > Do you mean in `transitionToStaled()`? That makes sense to me. Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
AndrewJSchofield commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432701772 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); +// TODO: Integrate partition revocation/loss callback +// Clear the current assignment and subscribed partitions because the member has left the group +updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true); Review Comment: Indeed. I find this very confusing. `STALED` is not a word. I'm not sure whether it means `STALE` or `STALLED`, although I think it's the former. If we want to make a verb for this, I suggest `GONE_STALE`. But really, I think that `transitionToStale` is probably best, along with changing to `MemberState.STALE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432677841 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -528,13 +529,18 @@ public void testPollTimerExpiration() { heartbeatRequestManager = createHeartbeatRequestManager(); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); +when(membershipManager.state()).thenReturn(MemberState.STABLE); +doNothing().when(membershipManager).transitionToStaled(); Review Comment: I do not think, you need this. Should also work without. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); +// TODO: Integrate partition revocation/loss callback +// Clear the current assignment and subscribed partitions because the member has left the group +updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true); Review Comment: Do you mean in `transitionToStaled()`? That makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
dajac commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432517804 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); +// TODO: Integrate partition revocation/loss callback +// Clear the current assignment and subscribed partitions because the member has left the group +updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true); Review Comment: Should we rather call this before `transitionTo(MemberState.STALED);`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on PR #15035: URL: https://github.com/apache/kafka/pull/15035#issuecomment-1863986474 hi @cadonna - Thanks for putting time into the review. I've addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432322309 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -234,7 +235,8 @@ public long maximumTimeToWait(long currentTimeMs) { * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the * member is in the {@link MemberState#UNSUBSCRIBED} state. Review Comment: Is the state here correct? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -169,7 +169,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { startingTimestamp = startingTimestamp) } - // TODO: enable this test for the consumer group protocol when KAFKA-16008 has been fixed. Review Comment: Why did you remove this comment but not enable the test? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -196,7 +195,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(1, listener.callsToRevoked) } - // TODO: enable this test for the consumer group protocol when KAFKA-16009 has been fixed. Review Comment: Why did you remove this comment but not enable the test? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -660,14 +672,14 @@ private HeartbeatRequestManager createHeartbeatRequestManager() { subscriptions = mock(SubscriptionState.class); membershipManager = mock(MembershipManager.class); backgroundEventHandler = mock(BackgroundEventHandler.class); -heartbeatState = new HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs); -heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( +heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs)); Review Comment: No, I don't. Why not passing mocks to this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432202539 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -234,7 +237,8 @@ public long maximumTimeToWait(long currentTimeMs) { * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the * member is in the {@link MemberState#UNSUBSCRIBED} state. */ -public void resetPollTimer() { +public void resetPollTimer(final long pollMs) { +pollTimer.update(pollMs); Review Comment: note: pollMs is the time user invoke consumer#poll. This is better than taking time.millisecond() because there could be some discrepancy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432201481 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -195,6 +195,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true); // We can ignore the leave response because we can join before or after receiving the response. heartbeatRequestState.reset(); +heartbeatState.reset(); Review Comment: One example would be fenced exception - we need to reset before sending a joining heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432201197 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -288,7 +288,7 @@ public CompletableFuture> addOffsetFetchR } public void updateAutoCommitTimer(final long currentTimeMs) { -this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs)); +this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs)); Review Comment: switching to updateTimer for a more descriptive name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199885 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -660,14 +672,14 @@ private HeartbeatRequestManager createHeartbeatRequestManager() { subscriptions = mock(SubscriptionState.class); membershipManager = mock(MembershipManager.class); backgroundEventHandler = mock(BackgroundEventHandler.class); -heartbeatState = new HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs); -heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( +heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs)); Review Comment: the purpose of spying was to make sure a certain method was invoked. I think this is an acceptable use case, do you agree? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199373 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -385,6 +386,7 @@ public void testWakeupAfterEmptyFetch() { assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } +@Test Review Comment: I have no idea why this was removed. It could be me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199230 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -136,7 +135,7 @@ private void process(final PollApplicationEvent event) { } requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs())); - requestManagers.heartbeatRequestManager.ifPresent(HeartbeatRequestManager::resetPollTimer); Review Comment: turned out this is a bug - the timer's current time was not updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199044 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() { @Override public void transitionToStaled() { memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; -currentAssignment.clear(); Review Comment: See the TODO on line 676. For now, I think we can directly clear the currentAssignment and remove owned partitions. In the future, we will need to wait for the callback to complete before proceed with joining. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1432198577 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { +// If the poll timer expires during reconciliation, we need to wait till the reconciliation completes before +// sending another leave group. Review Comment: the change was removed - i think we don't need to make a special case for reconciliation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431733629 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: Yes, good idea! I agree! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431731425 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: I agree. We had a bit of discussion around this. I think whether calling onPartitionLost/Revoke is up for discussion. Perhaps we limit the scope of the change to just clearing the assignment subscription w/o invoking the callbacks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431584651 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: I discussed this offline with @dajac and we think that since exceeding the poll interval is something unexpected we should rather call `onPartitionLost()` rather than `onPartitionRevoked()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431573878 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: Revocation at `close()`-time seems legit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431565902 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: OK, I just saw https://issues.apache.org/jira/browse/KAFKA-15696 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431561378 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: Isn't revocation something that the broker tells the member to because of rebalancing? In this case it is the member that decides to leave the group because it exceeded the poll interval. It is similar to close. I do not think we revoke partitions in close, do we? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431513723 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { +// If the poll timer expires during reconciliation, we need to wait till the reconciliation completes before +// sending another leave group. +if (pollTimer.isExpired() && membershipManager.state() == MemberState.STABLE) { Review Comment: good call - will add one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431513080 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: when the member becomes staled, it is still part of the group because it hasn't left yet. however, the background thread will need to leave the group and revoke all of its assignment because we assume the consumer is not being used - this is the logic for max.poll.interval.ms. since we are leaving the group, we will need to revoke all assignments. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() { @Override public void transitionToStaled() { memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; -currentAssignment.clear(); Review Comment: correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on PR #15035: URL: https://github.com/apache/kafka/pull/15035#issuecomment-1862913858 Test `testEnsurePollEventSentOnConsumerPoll()` fails deterministically in all CI builds. That means that something is wrong either with the test you added or with the production code that is tested. Please ensure that at least the tests you added or modified or are affected by your change are successful by running them locally. Otherwise, we review code that obviously needs changes and we end up cycling over the PR more than needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431509274 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { +// If the poll timer expires during reconciliation, we need to wait till the reconciliation completes before +// sending another leave group. Review Comment: if the member is reconciling its assignment, we need to wait for the process to finish before revoking all assignments, then leave the group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431466392 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() { @Override public void transitionToStaled() { memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; -currentAssignment.clear(); Review Comment: Why do we not need to remove the current assignment if we leave the group? Is this because of `updateSubscription()` on line 677? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { +// If the poll timer expires during reconciliation, we need to wait till the reconciliation completes before +// sending another leave group. Review Comment: Could you please elaborate why we need to wait? It is not clear to me, because I thought that we can leave the group whenever we want. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { +// If the poll timer expires during reconciliation, we need to wait till the reconciliation completes before +// sending another leave group. +if (pollTimer.isExpired() && membershipManager.state() == MemberState.STABLE) { Review Comment: Is there a unit test that tests the behavior when this condition is not satisfied? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -transitionToJoining(); +// clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: Is it correct to call `onPartitionRevoked()` here? Technically, the member is not a member of the group anymore. I am not sure it should be allowed to do anything group-related like committing offsets which is one of the main purposes of `onPartitionRevoked()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1429635957 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -195,6 +195,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true); // We can ignore the leave response because we can join before or after receiving the response. heartbeatRequestState.reset(); +heartbeatState.reset(); Review Comment: We might need to reset this at other places where it requires to rejoin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org