Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-12 Thread via GitHub


lucasbru merged PR #14873:
URL: https://github.com/apache/kafka/pull/14873


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-12 Thread via GitHub


lucasbru commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1851574858

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1423372314


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -948,6 +948,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 // 1 consumer using range assignment
 this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"range-group")
 
this.consumerConfig.setProperty(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, 
"range")
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"3")

Review Comment:
   the test is pretty flaky with the default 6s so I set it to a higher value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1423147503


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   OK, thank you for the clarification!
   Sorry, but I still have two comments:
   
   1. Why to we still need to call `onResponse()`? As far as I understand, all 
what we do there is not strictly needed because we will do the same when we 
restart heartbeating in the next poll, won't we? Is the error handling in this 
case so important? Wouldn't it be equally fine to not handle errors until we 
start heartbeating? I am asking because the code and probably also the 
reasoning about the code would become simpler if we just completely ignored the 
response. I also see that it is not a big deal doing it this or the other way. 
So, if you do not want to change it, it is fine with me.
   However, I just realized that with this change we will call 
`membershipManager.onHeartbeatResponseReceived()` and 
`maybeSendGroupMetadataUpdateEvent()` in the error case, won't we? Before the 
change these two calls are only done without an error. Is this acceptable?
   
   2. I understand that the commit gets rejected by the brokers. My question 
was whether we should avoid sending a commit request in the first place, since 
the consumer actually knows that it is not part of a group anymore. 
   
   A last question:
   
   > That being said, following commit requests issued after the consumer left 
the group will succeed because they will be sent without member ID or epoch 
(noticing that the member is not part of the group anymore).
   
   I am not sure I understand this. Why does the commit succeed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1423103409


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   thanks for the clarification @lianetm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850852690

   Per request: https://issues.apache.org/jira/browse/KAFKA-15993


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850844332

   Hey @lianetm - You are right. It uses rebalance callback in the test so it 
will need to be enabled after implementing the rebalance callback.  However, I 
did verify the leave group request by setting a break point.  Is there a jira 
for the rebalance callback invocation? I assume it should part of the task. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1423084344


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   To complete the pic regarding the commits, if a commit was sent out right 
before leaving, it will include memberId and epoch that are not valid anymore, 
so the broker will reject as @philipnee mentioned, sending an 
UNKNOWN_MEMBER_ID, and the client will fail the commit with that unrecoverable 
error. That being said, following commit requests issued after the consumer 
left the group will succeed because they will be sent without member ID or 
epoch (noticing that the member is not part of the group anymore). That logic 
is in the commitRequestManager but sharing here to connect the dots.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


lianetm commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850787095

   @philipnee thanks for the updates! One last comment. Important follow-up to 
this PR is to enable the specific max poll related integration tests (like 
`testMaxPollIntervalMs`), that are currently disabled because of the dependency 
on the callbacks. Is there a jira for that so we make sure we run them asap? 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


lucasbru commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850740142

   @philipnee thanks for the updates.
   @cadonna I'll wait for you green light this by approving.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850712115

   Hi @lianetm and @lucasbru - Per your request, I've just ran the tests using 
   
   ```
   Arguments.of("kraft+kip848", "consumer"))
   ```
   
   All tests passed except `testRemoteAssignorRange`.  Per lucas suggestion, I 
extended the max.poll.interval.ms to 30s, the test passed.
   
   ```
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testCoordinatorFailover(String, String) > 
testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testCoordinatorFailover(String, String) > 
testCoordinatorFailover(String, String).quorum=kraft.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testCoordinatorFailover(String, String) > 
testCoordinatorFailover(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testClusterResourceListener(String, String) > 
testClusterResourceListener(String, 
String).quorum=kraft+kip848.groupProtocol=consumer PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testSimpleConsumption(String, String) > 
testSimpleConsumption(String, 
String).quorum=kraft+kip848.groupProtocol=consumer PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > 
testPartitionsForAutoCreate(String, String).quorum=zk.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > 
testPartitionsForAutoCreate(String, String).quorum=kraft.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > 
testPartitionsForAutoCreate(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > 
testShrinkingTopicSubscriptions(String, String).quorum=zk.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > 
testShrinkingTopicSubscriptions(String, 
String).quorum=kraft.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > 
testShrinkingTopicSubscriptions(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > 
testMaxPollIntervalMs(String, String).quorum=zk.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > 
testMaxPollIntervalMs(String, String).quorum=kraft.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > 
testMaxPollIntervalMs(String, String).quorum=kraft+kip848.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testAssignAndConsumeFromCommittedOffsets(String, 
String) > testAssignAndConsumeFromCommittedOffsets(String, 
String).quorum=kraft+kip848.groupProtocol=consumer PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testSubsequentPatternSubscription(String, String) > 
testSubsequentPatternSubscription(String, 
String).quorum=zk.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testSubsequentPatternSubscription(String, String) > 
testSubsequentPatternSubscription(String, 
String).quorum=kraft.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testSubsequentPatternSubscription(String, String) > 
testSubsequentPatternSubscription(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testPerPartitionLeadMetricsCleanUpWithAssign(String, 
String) > testPerPartitionLeadMetricsCleanUpWithAssign(String, 
String).quorum=kraft+kip848.groupProtocol=consumer PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 3 > 
PlaintextConsumerTest > testNullGroupIdNotSupportedIfCommitting(String, String) 
> 

Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1850676664

   hi @cadonna - Sorry about unintentionally ignoring your question.  I've just 
responded to it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1422983344


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   For calling onResponse: We need to try our best to handle the error if there 
is one.   However, when leaving a group, we don't need to process the 
`onHeartbeatResponseReceived` because we don't need to reconcile.
   
   After leaving the group - The request manager should not keep heart beating. 
 For this specific PR, when the poll timer expires, it will return an empty 
response when the request manager gets polled.  See the first if block in 
poll().  And you are right, when the user poll the consumer, it resets the 
timer, and on the next request manager poll (they happen asynchronously), it 
should send a heartbeat to rejoin.
   
   i think if we commit after leaving the group - I think the commit will get 
rejected with an error.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


cadonna commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1849541992

   > @lianetm / @cadonna any more comments on this?
   
   @lucasbru I did not get any answer to the following comment: 
https://github.com/apache/kafka/pull/14873#discussion_r1418729793 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-11 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1422075316


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   @philipnee Any thoughts about my comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-10 Thread via GitHub


lucasbru commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1849099041

   RemoteRangeAssignorTest is still failing for this one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-09 Thread via GitHub


lianetm commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1848609666

   Thanks for the changes @philipnee , LGTM. 
   
   I would only +1 on @lucasbru suggestion of locally running the existing 
`PlainTextConsumer` integration test on this PR, as it is currently disabled in 
the CI. Also trying to get some of the max poll related ones to run (OK if in 
different PR, as I expect it will require some changes due to the listeners 
they use that we don't support yet).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1847607910

   @lianetm - Thanks for the feedback and I've made some changes according to 
your comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1420828282


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() {
 return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
 }
 
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToStaled() {
+transitionTo(MemberState.PREPARE_LEAVING);
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+transitionTo(MemberState.LEAVING);
+leaveGroupInProgress = 
Optional.of(CompletableFuture.completedFuture(null));
+transitionTo(MemberState.STALED);

Review Comment:
   nvm - i was confused by the pending assignments.  I think we need to clear 
the current assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1420817920


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() {
 return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
 }
 
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToStaled() {
+transitionTo(MemberState.PREPARE_LEAVING);
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+transitionTo(MemberState.LEAVING);
+leaveGroupInProgress = 
Optional.of(CompletableFuture.completedFuture(null));
+transitionTo(MemberState.STALED);

Review Comment:
   I think transitionToJoining, which takes place when sending out the 
heartbeat, already clears the current assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1420804236


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() {
 return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
 }
 
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToStaled() {
+transitionTo(MemberState.PREPARE_LEAVING);
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+transitionTo(MemberState.LEAVING);
+leaveGroupInProgress = 
Optional.of(CompletableFuture.completedFuture(null));
+transitionTo(MemberState.STALED);

Review Comment:
   thanks that makes a lot of sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1420598917


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() {
 return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
 }
 
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToStaled() {
+transitionTo(MemberState.PREPARE_LEAVING);
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+transitionTo(MemberState.LEAVING);
+leaveGroupInProgress = 
Optional.of(CompletableFuture.completedFuture(null));
+transitionTo(MemberState.STALED);

Review Comment:
   We also need to make sure that when going down this path, we clear the 
current assignment, so the member actually rejoins with its subscription, but 
ready to take on whatever new partitions the broker assigns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1420570473


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() {
 return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
 }
 
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToStaled() {
+transitionTo(MemberState.PREPARE_LEAVING);

Review Comment:
   Looks to me this is not really needed here? We use this state when we are 
taking some action before sending the last HB (mainly callbacks), but in this 
case there is no async operation to do, just send the last HB right? which is 
explicitly done in the HB mgr when checking that the poll timer expired



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1420576356


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() {
 return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
 }
 
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToStaled() {
+transitionTo(MemberState.PREPARE_LEAVING);
+memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+transitionTo(MemberState.LEAVING);

Review Comment:
   Similar, do we need this? We use this LEAVING state to signal the HB manager 
that it should send a HB now, but in this staled member case, that last HB is 
sent based on the check that the poll timer expired (not based on any state 
that should HB now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1420570473


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -587,6 +599,18 @@ public boolean shouldSkipHeartbeat() {
 return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
 }
 
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToStaled() {
+transitionTo(MemberState.PREPARE_LEAVING);

Review Comment:
   Looks to me this is not really needed here? We use this state when we are 
taking some action before sending the last HB (mainly callbacks), but in this 
case there is no async operation to do, just send the last HB right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


lucasbru commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1847153287

   More generally, can you enable the integration tests for this PR?
   
   Can you also enable the `testMaxPollIntervalMs`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-08 Thread via GitHub


lucasbru commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1846962550

   There is a suspicious test failure: 
   
   
https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=Europe%2FVienna=kafka.api.PlaintextConsumerTest=testRemoteAssignorRange(String%2C%20String)%5B1%5D
   
   I doesn't seem to fail on master at all, but it fails consistently for this 
PR. It failed on one other PR once. We probably have to look at this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-07 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1845782589

   There's quite a bit of failing tests, so i rebased to try to resolve them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-07 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1418729793


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   If the heartbeat request manager ignores the response, why does it call 
`onResponse()`? Does the heartbeat request manager continue to send heartbeats 
although the consumer left the group? If yes, what is the reason for it? I 
would have expected that the consumer leaves the group and stops heartbeating. 
Then at the next call to `Consumer#poll()` it should join the group again.
   
   Another question, how does the consumer avoid that offsets are committed (or 
any other group related request) when the poll timer expired and the consumer 
is in the staled state? Probably the broker will reject the commit, but 
actually the consumer proactively left the group, so it knows that it is not 
part of the group anymore and it could just not send the offset commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-07 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1418729793


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   If the heartbeat request manager ignores the response, why does it call 
`onResponse()`? Does the heartbeat request manager continue to send heartbeats 
although the consumer left the group? If yes, what is the reason for it? 
   
   Another question, how does the consumer avoid that offsets are committed 
when the poll timer expired and the consumer is in the staled state? Probably 
the broker will reject the commit, but actually the consumer proactively left 
the group, so it knows that it is not part of the group anymore and it could 
just not send the offset commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1843583099

   Hi @lianetm @cadonna @lucasbru - Thanks for taking the time reviewing my PR. 
I addressed your comments and left one for @cadonna.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1417898446


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();

Review Comment:
   you are 100% correct.  I submit a patch for this specific test.  Thanks for 
catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1417795477


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -201,13 +224,35 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
-private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+/**
+ * When consumer polls, we need to reset the pollTimer.  If the poll timer 
has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+public void resetPollTimer() {
+pollTimer.reset(maxPollIntervalMs);
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+ final 
boolean ignoreResponse) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+heartbeatRequestState.onSendAttempt(currentTimeMs);
+membershipManager.onHeartbeatRequestSent();
+return request;
+}
+
+private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
 NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-coordinatorRequestManager.coordinator());
+new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+coordinatorRequestManager.coordinator());
 return request.whenComplete((response, exception) -> {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), request.handler().completionTimeMs());
+// The response is only ignore when the member becomes staled. 
This is because the member needs to
+// rejoin on the next poll regardless the server has responded 
to the heartbeat.
+if (!ignoreResponse) {
+
membershipManager.onHeartbeatResponseReceived(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data());
+maybeSendGroupMetadataUpdateEvent();

Review Comment:
   @cadonna - can you comment on this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


lianetm commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1843296141

   @philipnee agree on leaving the membership manager determine the epoch to 
use to leave group. That's already in the [state 
machine](https://github.com/apache/kafka/blob/adcbdaca72a3484d729e79061b947bcb9c71533a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L568)
 PR that's on review but not merged yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416992430


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
+/**
+ * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+ */
+public void ack() {
+pollTimer.reset(rebalanceTimeoutMs);
+}
+
+Timer pollTimer() {

Review Comment:
   I was assuming that after passing the poll timer from the outside, we don't 
need to make this visible for testing anymore



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -137,4 +142,15 @@ public interface MembershipManager {
  * Note that list of topics of the subscription is taken from the shared 
subscription state.
  */
 void onSubscriptionUpdated();
+
+/**
+ * Transition to the {@link MemberState#JOINING} state to attempt joining 
a group.
+ */
+void transitionToJoining();
+
+/**
+ * When the user stops polling the consumer, the member will be 
transitioned to LEAVING without revoking the

Review Comment:
   How does the comment about `LEAVING` relate to this method? Maybe needs to 
be updated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416983368


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   Oh, I was not aware of the static membership handling in KIP-848. I should 
have also looked into that instead of only looking into the legacy consumer. 
Thanks for the clarification!
   
   Yes, I agree that consumers that use group management should always send a 
leave group as KIP-848 describes.
   
   So the comment is wrong. I would propose to just remove the whole comment 
and give the timer a descriptive name. In general I find inline comments 
problematic because -- at some point -- they start to lie because they are 
never properly maintained. Instead I prefer to use meaningful and descriptive 
names and to extract code that need a comment to methods that either have a 
meaningful name or a comment on the method, preferring the meaningful name. 
What I just wrote applies in most cases but not in all, but it definitely 
applies here.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416983368


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   Oh, I was not aware of the static membership handling in KIP-848. Should 
have also looked into that instead of only looking at the legacy consumer. 
Thanks for the clarification!
   
   Yes, I agree that consumers that use group management should always send a 
leave group as KIP-848 describes.
   
   So the comment is wrong. I would propose to just remove the whole comment 
and give the timer a descriptive name. In general I find inline comments 
problematic because -- at some point -- they start to lie because they are 
never properly maintained. Instead I prefer to use meaningful and descriptive 
names and to extract code that need a comment to methods that either have a 
meaningful name or a comment on the method, preferring the meaningful name. 
What I just wrote applies in most cases but not in all, but it definitely 
applies here.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-06 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416983368


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   Oh, I was not aware of the static membership handling in KIP-848. Thanks for 
the clarification!
   
   Yes, I agree that consumers that use group management should always send a 
leave group as KIP-848 describes.
   
   So the comment is wrong. I would propose to just remove the whole comment 
and give the timer a descriptive name. In general I find inline comments 
problematic because -- at some point -- they start to lie because they are 
never properly maintained. Instead I prefer to use meaningful and descriptive 
names and to extract code that need a comment to methods that either have a 
meaningful name or a comment on the method, preferring the meaningful name. 
What I just wrote applies in most cases but not in all, but it definitely 
applies here.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842164667

   Hi @lianetm - Thanks for the time reviewing the PR. Instead of using boolean 
flags, I added a _STALED_ state to address your concern.  I think we do need 
this state to stay consistent with the state logic, i.e., I agree that 
unsubscribed is probably not the right state to be in as it hints to the user 
that it needs to subscribe.  Let me know what do you think.
   
   For the static member - I think the right thing to do is to let the 
membershipManager to decide the right epoch to send.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842032321

   Thanks for the changes @philipnee, left answers for the static membership 
and some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416614219


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();

Review Comment:
   I know we've been all writing this kind of unit tests that go into the 
integration test land, but if we want to start correcting/simplifying that, 
this unit test could be simplified. As it is for the HB manager, it should just 
`verify` the call to the membershipManager func when the timer expires (and 
then another test in the MembershipMgr for that onStale func, that is needed 
anyways I would say)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416609139


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,51 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();
+time.sleep(1);
+// Sending first heartbeat and transitioning to stable
+assertHeartbeat(heartbeatRequestManager);
+assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+// Expires the poll timer, ensure sending a leave group
+time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+assertLeaveGroup(heartbeatRequestManager);
+assertTrue(heartbeatRequestManager.pollTimer().isExpired());
+// Poll again, ensure we heartbeat again.
+time.sleep(1);
+heartbeatRequestManager.resetPollTimer();
+assertHeartbeat(heartbeatRequestManager);
+assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+}
+
+private void assertHeartbeat(HeartbeatRequestManager hrm) {
+System.out.println("assertHeartbeat");
+NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
+assertEquals(1, pollResult.unsentRequests.size());
+assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
pollResult.timeUntilNextPollMs);
+
pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0),
+Errors.NONE));
+}
+
+private void assertLeaveGroup(HeartbeatRequestManager hrm) {
+NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
+assertEquals(1, pollResult.unsentRequests.size());
+ConsumerGroupHeartbeatRequestData data = 
(ConsumerGroupHeartbeatRequestData) 
pollResult.unsentRequests.get(0).requestBuilder().build().data();
+assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
data.memberEpoch());
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

Review Comment:
   Is this really the right end state when the poll timer expires? This means 
that the user will have to call `subscribe` to join the group again. Is that 
what the old coordinator requires? (I expected that the old code just requires 
the consumer to be polled again to join the group. If my understanding is right 
then we're missing logic after sending the last HB to leave: on timer 
expiration we should send the last HB and transitionToJoining so that the 
member re-joins the group on the next poll) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416605360


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   My take is that we should always send the leave group when the poll timer 
expires (group.instance.id null or not). Dynamic members should send -1 and 
static -2 as described in the KIP. The [documentation of the 
max.poll.interval](https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms)
 is just describing what will happen on the broker side (if static member 
leaving, broker won't re-assign the partitions until the session timeout 
expires. If dynamic member, it will re-assign them right away). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1841786386

   Hi @lianetm @cadonna @lucasbru - Addressed your comments.  I've left a 
question around sending leave group for static member.  From reading the KIP, 
seems like we should do it.  I think we need further clarification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416388581


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   I think you meant : only when `group.instance.id` is not set, i.e. dynamic 
member?
   
   This is the current implementation
   ```
// Starting from 2.3, only dynamic members will send LeaveGroupRequest to 
the broker,
   // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
   // and the membership expiration is only controlled by session 
timeout.
   if (isDynamicMember() && !coordinatorUnknown() &&
   state != MemberState.UNJOINED && generation.hasMemberId()) {
   // this is a minimal effort attempt to leave the group. we do not
   // attempt any resending if the request fails or times out.
   log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
   generation.memberId, coordinator, leaveReason);
   LeaveGroupRequest.Builder request = new 
LeaveGroupRequest.Builder(
   rebalanceConfig.groupId,
   Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
   );
   
   future = client.send(coordinator, request).compose(new 
LeaveGroupResponseHandler(generation));
   client.pollNoWakeup();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416383477


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   well, i'm not sure actually.  According to KIP-848: `Static membership, 
introduced in KIP-345, is still supported by this new rebalance protocol. When 
a member wants to leave temporary – e.g. while being bounced – it should send 
an heartbeat with a member epoch equals to -2. This signals to the group 
coordinator that the member left but will rejoin within the session timeout. 
When the member rejoins with the same instance ID, the group coordinator 
replaces the old member by the new member and gives back its current 
assignment.`
   
   



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   well, i'm not sure actually.  According to KIP-848: `Static membership, 
introduced in KIP-345, is still supported by this new rebalance protocol. When 
a member wants to leave temporary – e.g. while being bounced – it should send 
an heartbeat with a member epoch equals to -2. This signals to the group 
coordinator that the member left but will rejoin within the session timeout. 
When the member rejoins with the same instance ID, the group coordinator 
replaces the old member by the new member and gives back its current 
assignment.`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416380771


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();

Review Comment:
   it is not possible to transition to fenced from unsubscribed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416378676


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
+/**
+ * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+ */
+public void ack() {

Review Comment:
   The code is updated - but the pollTimer is updated on a regular cadence by 
the HeartbeatRequestManager.poll(). this func is used to check if the timer has 
been expired.  If it was, then we reset the timer everytime we poll the 
consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1415377837


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -121,11 +127,13 @@ public HeartbeatRequestManager(
 this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, rebalanceTimeoutMs);
 this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
 retryBackoffMaxMs, rebalanceTimeoutMs);
+this.pollTimer = time.timer(rebalanceTimeoutMs);
 }
 
 // Visible for testing
 HeartbeatRequestManager(
 final LogContext logContext,
+final Time time,

Review Comment:
   Why not passing in directly the poll timer instead of the mock time that is 
only used to create the poll timer. By passing in the poll timer, you can also 
remove method `pollTimer()` since that is only needed for testing.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -121,11 +127,13 @@ public HeartbeatRequestManager(
 this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, rebalanceTimeoutMs);
 this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
 retryBackoffMaxMs, rebalanceTimeoutMs);
+this.pollTimer = time.timer(rebalanceTimeoutMs);

Review Comment:
   Yes, please, rename.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+// Sending first heartbeat and transitioning to stable
+assertHeartbeat(heartbeatRequestManager);
+assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+// Expires the poll timer, and ensure heartbeat is not sent
+time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+assertNoHeartbeat(heartbeatRequestManager);

Review Comment:
   As I wrote before, I think the consumer should leave the group.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
+/**
+ * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+ */
+public void ack() {
+pollTimer.reset(rebalanceTimeoutMs);
+}
+
+Timer pollTimer() {

Review Comment:
   See my comment above about directly passing the poll timer into the 
constructor. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -166,11 +175,23 @@ public HeartbeatRequestManager(
  */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat()) {
+if (!coordinatorRequestManager.coordinator().isPresent() ||
+membershipManager.shouldSkipHeartbeat() ||
+pollTimer.isExpired()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with 

Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-04 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1414217500


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -166,11 +175,23 @@ public HeartbeatRequestManager(
  */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat()) {
+if (!coordinatorRequestManager.coordinator().isPresent() ||
+membershipManager.shouldSkipHeartbeat() ||
+pollTimer.isExpired()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+return PollResult.EMPTY;

Review Comment:
   I'm thinking what about instead of returning the empty here, we transition 
the member to "leaving" (`transitionToSendingLeaveGroup`), and that's it. On 
the following lines that should make the shouldHbNow true, we would send the 
last HB and transition to unsubscribed (effectively leaving without triggering 
the callbacks, which is what the old logic does)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-04 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1414217500


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -166,11 +175,23 @@ public HeartbeatRequestManager(
  */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat()) {
+if (!coordinatorRequestManager.coordinator().isPresent() ||
+membershipManager.shouldSkipHeartbeat() ||
+pollTimer.isExpired()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+return PollResult.EMPTY;

Review Comment:
   I'm thinking what about instead of returning the empty here, we transition 
the member to "leaving", and that's it. On the following lines that should make 
the shouldHbNow true, we would send the last HB and transition to unsubscribed 
(effectively leaving without triggering the callbacks, which is what the old 
logic does)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-04 Thread via GitHub


lucasbru commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1414200098


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
+/**
+ * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+ */
+public void ack() {

Review Comment:
   Is the timer up to date at this point? Otherwise, `deadlineMs` will be set 
incorrectly. Maybe pass in the current time into this function and update the 
timer before setting the timeout. Like we do for auto-commit.
   
   Also, better name please. `resetPollTimer` ?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -121,11 +127,13 @@ public HeartbeatRequestManager(
 this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, rebalanceTimeoutMs);
 this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
 retryBackoffMaxMs, rebalanceTimeoutMs);
+this.pollTimer = time.timer(rebalanceTimeoutMs);

Review Comment:
   rebalanceTimeoutMs == maxPollIntervalMs ? maybe rename in that case.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();

Review Comment:
   You could consider using `membershipManager.transitionToFenced(); // And 
indirect way of moving to JOINING state` instead of changing the visibility of 
prod code



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -428,9 +428,9 @@ private void updateSubscription(Collection 
assignedPartitions,
  * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
  * try to join the group on the next heartbeat request. This is expected 
to be invoked when
  * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
- * Visible for testing.

Review Comment:
   I think it's still visible for testing only



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
+/**
+ * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+ */
+public void ack() {
+pollTimer.reset(rebalanceTimeoutMs);
+}
+
+Timer pollTimer() {

Review Comment:
   // visible for testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-03 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1837908142

   hi @lianetm @lucasbru can I ask for a review for this PR? Modification was 
made based on our discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1832745572

   In the current implementation, when poll timer expires, the consumer sends a 
leavegroup request w/o revoking the partitions. This is obviously a bit 
different to the current state transition, so I'm not sure what's the right 
thing to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1409899121


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (!isLeaveGroupInProgress() && pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+membershipManager.leaveGroup().whenComplete((ignored, exception) 
-> {

Review Comment:
   What is the exact to do in KIP-848 if the poll timer expires? The current 
implementation sends a leavegroup request directly without revoking partitions. 
 If we want to do that, we need to modify the state transition a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (!isLeaveGroupInProgress() && pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+membershipManager.leaveGroup().whenComplete((ignored, exception) 
-> {

Review Comment:
   Not sure if kip-848 changes this but
   ```
   // Starting from 2.3, only dynamic members will send LeaveGroupRequest to 
the broker,
   // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
   // and the membership expiration is only controlled by session 
timeout.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1409862992


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -169,6 +174,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 
+pollTimer.update(currentTimeMs);
+if (!isLeaveGroupInProgress() && pollTimer.isExpired()) {
+logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+"the poll loop is spending too much time processing messages. 
You can address this " +
+"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+"returned in poll() with max.poll.records.");
+membershipManager.leaveGroup().whenComplete((ignored, exception) 
-> {

Review Comment:
   Not sure if kip-848 changes this but
   ```
   // Starting from 2.3, only dynamic members will send LeaveGroupRequest to 
the broker,
   // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
   // and the membership expiration is only controlled by session 
timeout.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-11-29 Thread via GitHub


philipnee opened a new pull request, #14873:
URL: https://github.com/apache/kafka/pull/14873

   Currently, poll interval is not being respected during consumer#poll.  We 
need to make sure the consumer can leave the group actively when the user 
doesn't poll frequently enough.  This PR added 1. a poll timer and 2. a boolean 
flag notifying joining on subsequent poll.
   - The poll timer is always configured with rebalanceTimeout
   - The boolean flag, rejoinOnPoll, is set when the leaveGroup completes.  
Such as the subsequent poll and rejoin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org