Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-23 Thread via GitHub


lucasbru commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1500478736


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,13 +232,22 @@ public MembershipManager membershipManager() {
  * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
  * responsive to changes.
  *
+ * Similarly, we may have to unblock the application thread to send a 
`PollApplicationEvent` to make sure
+ * our poll timer will not expire while we are polling.
+ *
  * In the event that heartbeats are currently being skipped, this still 
returns the next heartbeat
  * delay rather than {@code Long.MAX_VALUE} so that the application thread 
remains responsive.
  */
 @Override
 public long maximumTimeToWait(long currentTimeMs) {
-boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
-return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+pollTimer.update(currentTimeMs);
+if (
+pollTimer.isExpired() ||
+(membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight())
+) {
+return 0L;
+}
+return Math.min(pollTimer.remainingMs() / 2, 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs));

Review Comment:
   It's somewhat arbitrary. We need to make sure that the application thread 
doesn't block so long that the poll timer expires. We want to let it unblock 
sometime before the timer expires, and send an event to the background thread 
that it's still polling, and give the background thread time to process the 
event and reset the poll timer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-22 Thread via GitHub


mjsax commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1500151525


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,13 +232,22 @@ public MembershipManager membershipManager() {
  * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
  * responsive to changes.
  *
+ * Similarly, we may have to unblock the application thread to send a 
`PollApplicationEvent` to make sure
+ * our poll timer will not expire while we are polling.
+ *
  * In the event that heartbeats are currently being skipped, this still 
returns the next heartbeat
  * delay rather than {@code Long.MAX_VALUE} so that the application thread 
remains responsive.
  */
 @Override
 public long maximumTimeToWait(long currentTimeMs) {
-boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
-return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+pollTimer.update(currentTimeMs);
+if (
+pollTimer.isExpired() ||
+(membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight())
+) {
+return 0L;
+}
+return Math.min(pollTimer.remainingMs() / 2, 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs));

Review Comment:
   For my own education: why `/ 2` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-20 Thread via GitHub


lucasbru merged PR #15372:
URL: https://github.com/apache/kafka/pull/15372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-19 Thread via GitHub


lucasbru commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1494847054


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin
+consumer.poll(Duration.ofMillis(500))
+consumer.poll(Duration.ofMillis(500))
+
+// Check that we did not rejoin

Review Comment:
   Done



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-19 Thread via GitHub


cadonna commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1494650399


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin
+consumer.poll(Duration.ofMillis(500))
+consumer.poll(Duration.ofMillis(500))
+
+// Check that we did not rejoin

Review Comment:
   Do we need this comment? I think it would be better to delete it and to 
rename `initialAssignedCalls` to something more meaningful like 
`callsToAssignedAfterFirstRebalance` or `callsToAssignedBeforePolls`.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin

Review Comment:
   nit:
   This comment is a bit confusing. What is it supposed to clarify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on PR #15372:
URL: https://github.com/apache/kafka/pull/15372#issuecomment-1948876670

   Hey @mjsax . This PR is right now waiting for @cadonna's review. If you have 
time, you could take a look to get into the 848 work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-16 Thread via GitHub


lianetm commented on PR #15372:
URL: https://github.com/apache/kafka/pull/15372#issuecomment-1948611367

   I like the last commit msg :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-16 Thread via GitHub


lucasbru commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1492266247


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000));

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-15 Thread via GitHub


lianetm commented on PR #15372:
URL: https://github.com/apache/kafka/pull/15372#issuecomment-1946894520

   thanks for the changes! lgtm.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-15 Thread via GitHub


lianetm commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1491444058


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000));

Review Comment:
   nit: semicolon not needed here, same in the next 2 lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-15 Thread via GitHub


lucasbru commented on PR #15372:
URL: https://github.com/apache/kafka/pull/15372#issuecomment-1945629703

   @cadonna could you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-14 Thread via GitHub


lucasbru opened a new pull request, #15372:
URL: https://github.com/apache/kafka/pull/15372

   The consumer keeps a poll timer, which is used to ensure liveness of the 
application thread. The poll timer automatically updates while the 
`Consumer.poll(Duration)` method is blocked, while the newer consumer only 
updates the poll timer when a new call to `Consumer.poll(Duration)` is issued. 
This means that the kafka-console-consumer.sh tools, which uses a very long 
timeout by default, works differently with the new consumer, with the consumer 
proactively rejoining the group during long poll timeouts.
   
   This change solves the problem by (a) repeatedly sending 
`PollApplicationEvents` to the background thread, not just on the first call of 
`poll` and (b) making sure that the application thread doesn't block for so 
long that it runs out of `max.poll.interval`.
   
   An integration test is added to make sure that we do not rejoin the group 
when a long poll timeout is used with a low `max.poll.interval`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org