ableegoldman commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1483722072
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1842,36 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + private boolean transactionBuffersExceedCapacity = false; + + boolean transactionBuffersExceedCapacity() { + return transactionBuffersExceedCapacity; + } + + boolean transactionBuffersWillExceedCapacity() { + final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; Review Comment: nit: avoid implicit assumptions about the value of sentinel constants. For example what if we need to introduce another sentinel value that means something other than unbounded, and give it a value of -2? Best to define a static constant like `UNBOUNDED_UNCOMMITTED_BYTES = -1` and then check like this: ```suggestion final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes == UNBOUNDED_UNCOMMITTED_BYTES; ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1889,7 +1925,8 @@ private int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> t if (rebalanceInProgress) { return -1; } else { - return taskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, consumedOffsetsAndMetadata); + final int committedOffsets = taskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, consumedOffsetsAndMetadata); Review Comment: was this change intentional? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1842,36 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + private boolean transactionBuffersExceedCapacity = false; Review Comment: nit: move this with the other fields up at the top ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1842,36 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + private boolean transactionBuffersExceedCapacity = false; + + boolean transactionBuffersExceedCapacity() { + return transactionBuffersExceedCapacity; + } + + boolean transactionBuffersWillExceedCapacity() { + final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; Review Comment: And if you define the UNBOUNDED_UNCOMMITTED_BYTES constant in StreamsConfig, perhaps next to the config definition itself, then users can easily discover and use the variable for clear code, rather than hardcoding "-1" and then coming back a year later and wondering what that value means ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1842,36 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + private boolean transactionBuffersExceedCapacity = false; + + boolean transactionBuffersExceedCapacity() { + return transactionBuffersExceedCapacity; + } + + boolean transactionBuffersWillExceedCapacity() { + final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; + if (transactionBuffersAreUnbounded) { + return false; + } + + // force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold Review Comment: I think this method would benefit from slightly more detailed docs/comments. I'm also a bit concerned about over-optimization, but lmk what you think: First, IIUC, the idea is to use the number of additional uncommitted bytes since the last commit as a heuristic to estimate how many more uncommitted bytes will be added if we don't commit right now, and thus try to commit "just before" we go over the limit. Makes sense on its face, though it's obviously only as good as the buffer size increases are regular/linear. So the question is: will the buffer grow linearly between commits? Unfortunately I suspect the answer is "no" in many cases -- In fact, the only case in which the buffer will grow by the same amount each iteration is when every insert is a new key. Of course, most KV stores have a bounded keyspace (else they'd grow forever) so we expect to see updates to the same keys. The very first iteration after a commit will show the most growth, and that will taper off as we rebuild the keyset and see fewer and fewer new keys vs in-place updates. Of course it's not incorrect to commit early, I just wanted to get your thoughts on this. Maybe a good middle ground would be to maintain a running average `deltaBytes`? Or even just cut the previous `deltaBytes` in half? Either way it's still just an approximation, and things like maxUncommittedBytes are always just best-effort in the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org