nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1485925149


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    private boolean transactionBuffersExceedCapacity = false;
+
+    boolean transactionBuffersExceedCapacity() {
+        return transactionBuffersExceedCapacity;
+    }
+
+    boolean transactionBuffersWillExceedCapacity() {
+        final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;
+        if (transactionBuffersAreUnbounded) {
+            return false;
+        }
+
+        // force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold

Review Comment:
   > First, IIUC, the idea is to use the number of additional uncommitted bytes 
since the last commit as a heuristic to estimate how many more uncommitted 
bytes will be added if we don't commit right now, and thus try to commit "just 
before" we go over the limit.
   
   Correct, this is the intent. One correction though: we're calculating the 
delta since the last _iteration_ of the StreamThread, not since the last commit.
   
   This behaviour was requested on the mailing list, IIRC by @cadonna, to 
reduce the risk of OOM between commits. The thinking was that users will  
expect `statestore.uncommitted.max.bytes` to be an upper-bound, and likely set 
it with little headroom. If the commit interval is fairly long (e.g. 30 seconds 
under ALOS), then there's a good chance we would exceed this limit by a 
considerable margin.
   
   > [...] so we expect to see updates to the same keys. [...] and see fewer 
and fewer new keys vs in-place updates.
   
   Writes in RocksDB are the same size, irrespective of whether they are 
inserts of new keys, or updates to existing keys. The only case I can think of 
where this may be different is when migrating an existing record from a 
non-timestamped to a timestamped column family; in which case, the updates 
would actually be slightly larger, due to the additional tombstone written.
   
   Ultimately, any heuristic we choose will be imperfect. If we track an 
average, even since the last commit, then it would be less responsive to sudden 
increases in throughput, potentially allowing an OOM before the average has had 
a chance to catch up. The current approach attempts to err on the side of 
caution: we may trigger a commit unnecessarily early on occasion, but we should 
be less likely to exceed the configured buffer size.
   
   I'm not against changing this heuristic to a running average, or anything 
else; I'm just not sufficiently convinced it would be better :grin: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to