Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-22 Thread via GitHub


lucasbru merged PR #15035:
URL: https://github.com/apache/kafka/pull/15035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-21 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1867003094

   @dajac - thanks for merging the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-21 Thread via GitHub


cadonna commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1865922174

   Don't worry! We will take care of restarting the builds. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1865440196

   hmm @cadonna - seems like there are some build stability issue.  my other PR 
also failed after 8hr (it is completely unrelated to the consumer refactor) 
`PR-15023` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432925646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   it is there to verify the reset call and we also need a real object to 
ensure the manager can send a request.  If we mock it, then we have to mock the 
canSendRequest call, then it is pointless to verify the request returned from 
poll because it will always return one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432918143


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   If it is too much work to change, let's change it later and merge it as it 
is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432918143


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   If it is too much work to change, let's change it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432897655


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -525,16 +526,37 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-heartbeatRequestManager = createHeartbeatRequestManager();
+coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+membershipManager = mock(MembershipManager.class);
+heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(

Review Comment:
   Why is there still this spy?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   Is it correct that we have this call now in two places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1864595438

   @cadonna @dajac @AndrewJSchofield  - Thanks a lot for the reviews.  I've 
made changes accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432803468


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -679,4 +683,23 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager() {
 heartbeatRequestState,
 backgroundEventHandler);
 }
+
+private HeartbeatRequestManager createHeartbeatRequestManager(

Review Comment:
   turned out we are using testBuilder in this test again.  So we might want to 
refactor the test and turn everything to mock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432800050


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -191,10 +191,11 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
 "returned in poll() with max.poll.records.");
 // This should trigger a heartbeat with leave group epoch
-membershipManager.transitionToStaled();
+membershipManager.transitionToStale();

Review Comment:
   transitionTo**Stale**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432799267


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   Thanks, I originally thought we wanted to abandon the partitions after 
sending the leave group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


dajac commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432786211


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   > Do you mean in `transitionToStaled()`? That makes sense to me.
   
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


AndrewJSchofield commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432701772


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   Indeed. I find this very confusing. `STALED` is not a word. I'm not sure 
whether it means `STALE` or `STALLED`, although I think it's the former. If we 
want to make a verb for this, I suggest `GONE_STALE`. But really, I think that 
`transitionToStale` is probably best, along with changing to 
`MemberState.STALE`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432677841


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -528,13 +529,18 @@ public void testPollTimerExpiration() {
 heartbeatRequestManager = createHeartbeatRequestManager();
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
+when(membershipManager.state()).thenReturn(MemberState.STABLE);
+doNothing().when(membershipManager).transitionToStaled();

Review Comment:
   I do not think, you need this. Should also work without.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   Do you mean in `transitionToStaled()`? That makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-20 Thread via GitHub


dajac commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432517804


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -673,6 +673,9 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
+// TODO: Integrate partition revocation/loss callback
+// Clear the current assignment and subscribed partitions because 
the member has left the group
+updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), 
true);

Review Comment:
   Should we rather call this before `transitionTo(MemberState.STALED);`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1863986474

   hi @cadonna - Thanks for putting time into the review.  I've addressed your 
comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432322309


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -234,7 +235,8 @@ public long maximumTimeToWait(long currentTimeMs) {
  * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
  * member is in the {@link MemberState#UNSUBSCRIBED} state.

Review Comment:
   Is the state here correct? 



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -169,7 +169,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   startingTimestamp = startingTimestamp)
   }
 
-  // TODO: enable this test for the consumer group protocol when KAFKA-16008 
has been fixed.

Review Comment:
   Why did you remove this comment but not enable the test?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -196,7 +195,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(1, listener.callsToRevoked)
   }
 
-  // TODO: enable this test for the consumer group protocol when KAFKA-16009 
has been fixed.

Review Comment:
   Why did you remove this comment but not enable the test?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -660,14 +672,14 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager() {
 subscriptions = mock(SubscriptionState.class);
 membershipManager = mock(MembershipManager.class);
 backgroundEventHandler = mock(BackgroundEventHandler.class);
-heartbeatState = new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+heartbeatState = spy(new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs));

Review Comment:
   No, I don't.  Why not passing mocks to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432202539


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -234,7 +237,8 @@ public long maximumTimeToWait(long currentTimeMs) {
  * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
  * member is in the {@link MemberState#UNSUBSCRIBED} state.
  */
-public void resetPollTimer() {
+public void resetPollTimer(final long pollMs) {
+pollTimer.update(pollMs);

Review Comment:
   note: pollMs is the time user invoke consumer#poll.  This is better than 
taking time.millisecond() because there could be some discrepancy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432201481


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -195,6 +195,7 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, true);
 // We can ignore the leave response because we can join before or 
after receiving the response.
 heartbeatRequestState.reset();
+heartbeatState.reset();

Review Comment:
   One example would be fenced exception - we need to reset before sending a 
joining heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432201197


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -288,7 +288,7 @@ public CompletableFuture> addOffsetFetchR
 }
 
 public void updateAutoCommitTimer(final long currentTimeMs) {
-this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
+this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs));

Review Comment:
   switching to updateTimer for a more descriptive name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199885


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -660,14 +672,14 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager() {
 subscriptions = mock(SubscriptionState.class);
 membershipManager = mock(MembershipManager.class);
 backgroundEventHandler = mock(BackgroundEventHandler.class);
-heartbeatState = new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+heartbeatState = spy(new 
HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, 
maxPollIntervalMs));

Review Comment:
   the purpose of spying was to make sure a certain method was invoked. I think 
this is an acceptable use case, do you agree?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199373


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -385,6 +386,7 @@ public void testWakeupAfterEmptyFetch() {
 assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
 }
 
+@Test

Review Comment:
   I have no idea why this was removed.  It could be me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199230


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -136,7 +135,7 @@ private void process(final PollApplicationEvent event) {
 }
 
 requestManagers.commitRequestManager.ifPresent(m -> 
m.updateAutoCommitTimer(event.pollTimeMs()));
-
requestManagers.heartbeatRequestManager.ifPresent(HeartbeatRequestManager::resetPollTimer);

Review Comment:
   turned out this is a bug - the timer's current time was not updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432199044


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() {
 @Override
 public void transitionToStaled() {
 memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-currentAssignment.clear();

Review Comment:
   See the TODO on line 676.  For now, I think we can directly clear the 
currentAssignment and remove owned partitions.  In the future, we will need to 
wait for the callback to complete before proceed with joining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1432198577


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
+// If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+// sending another leave group.

Review Comment:
   the change was removed - i think we don't need to make a special case for 
reconciliation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431733629


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   Yes, good idea! I agree!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431731425


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   I agree.  We had a bit of discussion around this.  I think whether calling 
onPartitionLost/Revoke is up for discussion. Perhaps we limit the scope of the 
change to just clearing the assignment subscription w/o invoking the callbacks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431584651


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   I discussed this offline with @dajac and we think that since exceeding the 
poll interval is something unexpected we should rather call `onPartitionLost()` 
rather than `onPartitionRevoked()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431573878


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   Revocation at `close()`-time seems legit.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431565902


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   OK, I just saw https://issues.apache.org/jira/browse/KAFKA-15696
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431561378


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   Isn't revocation something that the broker tells the member to because of 
rebalancing? In this case it is the member that decides to leave the group 
because it exceeded the poll interval. It is similar to close. I do not think 
we revoke partitions in close, do we? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431513723


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
+// If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+// sending another leave group.
+if (pollTimer.isExpired() && membershipManager.state() == 
MemberState.STABLE) {

Review Comment:
   good call - will add one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431513080


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   when the member becomes staled, it is still part of the group because it 
hasn't left yet.  however, the background thread will need to leave the group 
and revoke all of its assignment because we assume the consumer is not being 
used - this is the logic for max.poll.interval.ms.  since we are leaving the 
group, we will need to revoke all assignments.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() {
 @Override
 public void transitionToStaled() {
 memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-currentAssignment.clear();

Review Comment:
   correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1862913858

   Test `testEnsurePollEventSentOnConsumerPoll()` fails deterministically in 
all CI builds. That means that something is wrong either with the test you 
added or with the production code that is tested. Please ensure that at least 
the tests you added or modified or are affected by your change are successful 
by running them locally. Otherwise, we review code that obviously needs changes 
and we end up cycling over the PR more than needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431509274


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
+// If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+// sending another leave group.

Review Comment:
   if the member is reconciling its assignment, we need to wait for the process 
to finish before revoking all assignments, then leave the group.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-19 Thread via GitHub


cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431466392


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() {
 @Override
 public void transitionToStaled() {
 memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-currentAssignment.clear();

Review Comment:
   Why do we not need to remove the current assignment if we leave the group? 
Is this because of `updateSubscription()` on line 677?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
+// If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+// sending another leave group.

Review Comment:
   Could you please elaborate why we need to wait? It is not clear to me, 
because I thought that we can leave the group whenever we want.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
+// If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+// sending another leave group.
+if (pollTimer.isExpired() && membershipManager.state() == 
MemberState.STABLE) {

Review Comment:
   Is there a unit test that tests the behavior when this condition is not 
satisfied?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
 MemberState state = state();
 if (isStaled()) {
 log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-transitionToJoining();
+// clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   Is it correct to call `onPartitionRevoked()` here? Technically, the member 
is not a member of the group anymore. I am not sure it should be allowed to do 
anything group-related like committing offsets which is one of the main 
purposes of `onPartitionRevoked()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-18 Thread via GitHub


philipnee commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1429635957


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -195,6 +195,7 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, true);
 // We can ignore the leave response because we can join before or 
after receiving the response.
 heartbeatRequestState.reset();
+heartbeatState.reset();

Review Comment:
   We might need to reset this at other places where it requires to rejoin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org