nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1473153338


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1097,10 +1102,10 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             for (final Task task : commitNeededActiveTasks) {
                 if (!dirtyTasks.contains(task)) {
                     try {
-                        // for non-revoking active tasks, we should not 
enforce checkpoint
-                        // since if it is EOS enabled, no checkpoint should be 
written while
-                        // the task is in RUNNING tate
-                        task.postCommit(false);
+                        // we only enforce a checkpoint if the transaction 
buffers are full
+                        // to avoid unnecessary flushing of stores under EOS
+                        final boolean enforceCheckpoint = 
maxUncommittedStateBytes > -1 && tasks.approximateUncommittedStateBytes() >= 
maxUncommittedStateBytes;
+                        task.postCommit(enforceCheckpoint);

Review Comment:
   Same as above.
   
   That said, I'm not convinced this particular change is necessary. This code 
is run during rebalance, and AFAIK no records will be processing during a 
rebalance; it shouldn't be possible for the transaction buffer to have 
increased beyond the last time we checked in the `StreamThread` main run-loop 
(aka. `StreamThread#maybeCommit`).
   
   Worst case scenario, if we leave this as `task.postCommit(false)`, and the 
state stores aren't flushed during a rebalance, if the transaction buffer is 
already close to exceeding its capacity, it will be detected during the next 
iteration of the run-loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to