lianetm commented on code in PR #21584:
URL: https://github.com/apache/kafka/pull/21584#discussion_r2962558237
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1251,6 +1251,18 @@ private RequestFuture<Void>
maybeAutoCommitOffsetsAsync() {
return null;
}
+ /**
+ * Best-effort, non-blocking auto-commit used when assign() is called.
+ * Fires once without blocking; on a retriable error the timer is reset to
+ * rebalanceConfig.retryBackoffMs.
+ */
+ public void maybeAutoCommitOnAssignment() {
+ if (autoCommitEnabled) {
+ nextAutoCommitTimer.reset(autoCommitIntervalMs);
Review Comment:
if we reset here, wouldn't be delaying the interval without really
committing if this commit fails?
Given that we don't retry this one, I wonder if we should only alter the
interval if we're sure we completed the commit? (which btw, is what we do on
the rebalance path, reset the timer onJoinComplete so after the commit
completed onJoinPrepare...thoughts?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1251,6 +1251,18 @@ private RequestFuture<Void>
maybeAutoCommitOffsetsAsync() {
return null;
}
+ /**
+ * Best-effort, non-blocking auto-commit used when assign() is called.
+ * Fires once without blocking; on a retriable error the timer is reset to
+ * rebalanceConfig.retryBackoffMs.
+ */
+ public void maybeAutoCommitOnAssignment() {
Review Comment:
conceptually this is the same we already do before a rebalance (autoCommit
now, without considering the interval), is there a reason why we can't reuse
the func used for that commit? (maybeAutoCommitOffsetsAsync();)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -605,10 +605,10 @@ public void assign(Collection<TopicPartition> partitions)
{
}
fetcher.clearBufferedDataForUnassignedPartitions(partitions);
- // make sure the offsets of topic partitions the consumer is
unsubscribing from
- // are committed since there will be no following rebalance
+ // Best-effort async auto-commit for previously-assigned
partitions.
+ // assign() does not block waiting for the commit to complete.
Review Comment:
uhm yeah, I agree with the approach of just fixing the bug we had for now
(do not consider the timer on this auto-commit), but to follow-up to see if we
can do better (maybe consistent with the rebalance path approach. There we
trigger auto-commit regardless of interval, allow time to retry safely
(partitions marked to ensure no fetch progress), reset interval once the
operation succeeds)...Food for thought for the follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]