nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1485925149
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1842,36 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + private boolean transactionBuffersExceedCapacity = false; + + boolean transactionBuffersExceedCapacity() { + return transactionBuffersExceedCapacity; + } + + boolean transactionBuffersWillExceedCapacity() { + final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; + if (transactionBuffersAreUnbounded) { + return false; + } + + // force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold Review Comment: > First, IIUC, the idea is to use the number of additional uncommitted bytes since the last commit as a heuristic to estimate how many more uncommitted bytes will be added if we don't commit right now, and thus try to commit "just before" we go over the limit. Correct, this is the intent. One correction though: we're calculating the delta since the last _iteration_ of the StreamThread, not since the last commit. This behaviour was requested on the mailing list, IIRC by @cadonna, to reduce the risk of OOM between commits. The thinking was that users will expect `statestore.uncommitted.max.bytes` to be an upper-bound, and likely set it with little headroom. If the commit interval is fairly long (e.g. 30 seconds under ALOS), then there's a good chance we would exceed this limit by a considerable margin. > [...] so we expect to see updates to the same keys. [...] and see fewer and fewer new keys vs in-place updates. Writes in RocksDB are the same size, irrespective of whether they are inserts of new keys, or updates to existing keys. The only case I can think of where this may be different is when migrating an existing record from a non-timestamped to a timestamped column family; in which case, the updates would actually be slightly larger, due to the additional tombstone written. Ultimately, any heuristic we choose will be imperfect. If we track an average, even since the last commit, then it would be less responsive to sudden increases in throughput, potentially allowing an OOM before the average has had a chance to catch up. The current approach attempts to err on the side of caution: we may trigger a commit unnecessarily early on occasion, but we should be less likely to exceed the configured buffer size. I'm not against changing this heuristic to a running average, or anything else; I'm just not sufficiently convinced it would be better :grin: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org