ableegoldman commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1483722072
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void
maybeThrowTaskExceptionsFromProcessingThreads() {
}
}
+ private boolean transactionBuffersExceedCapacity = false;
+
+ boolean transactionBuffersExceedCapacity() {
+ return transactionBuffersExceedCapacity;
+ }
+
+ boolean transactionBuffersWillExceedCapacity() {
+ final boolean transactionBuffersAreUnbounded =
maxUncommittedStateBytes < 0;
Review Comment:
nit: avoid implicit assumptions about the value of sentinel constants. For
example what if we need to introduce another sentinel value that means
something other than unbounded, and give it a value of -2? Best to define a
static constant like `UNBOUNDED_UNCOMMITTED_BYTES = -1` and then check like
this:
```suggestion
final boolean transactionBuffersAreUnbounded =
maxUncommittedStateBytes == UNBOUNDED_UNCOMMITTED_BYTES;
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1889,7 +1925,8 @@ private int
commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> t
if (rebalanceInProgress) {
return -1;
} else {
- return
taskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit,
consumedOffsetsAndMetadata);
+ final int committedOffsets =
taskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit,
consumedOffsetsAndMetadata);
Review Comment:
was this change intentional?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void
maybeThrowTaskExceptionsFromProcessingThreads() {
}
}
+ private boolean transactionBuffersExceedCapacity = false;
Review Comment:
nit: move this with the other fields up at the top
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void
maybeThrowTaskExceptionsFromProcessingThreads() {
}
}
+ private boolean transactionBuffersExceedCapacity = false;
+
+ boolean transactionBuffersExceedCapacity() {
+ return transactionBuffersExceedCapacity;
+ }
+
+ boolean transactionBuffersWillExceedCapacity() {
+ final boolean transactionBuffersAreUnbounded =
maxUncommittedStateBytes < 0;
Review Comment:
And if you define the UNBOUNDED_UNCOMMITTED_BYTES constant in StreamsConfig,
perhaps next to the config definition itself, then users can easily discover
and use the variable for clear code, rather than hardcoding "-1" and then
coming back a year later and wondering what that value means
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void
maybeThrowTaskExceptionsFromProcessingThreads() {
}
}
+ private boolean transactionBuffersExceedCapacity = false;
+
+ boolean transactionBuffersExceedCapacity() {
+ return transactionBuffersExceedCapacity;
+ }
+
+ boolean transactionBuffersWillExceedCapacity() {
+ final boolean transactionBuffersAreUnbounded =
maxUncommittedStateBytes < 0;
+ if (transactionBuffersAreUnbounded) {
+ return false;
+ }
+
+ // force an early commit if the uncommitted bytes exceeds or is
*likely to exceed* the configured threshold
Review Comment:
I think this method would benefit from slightly more detailed docs/comments.
I'm also a bit concerned about over-optimization, but lmk what you think:
First, IIUC, the idea is to use the number of additional uncommitted bytes
since the last commit as a heuristic to estimate how many more uncommitted
bytes will be added if we don't commit right now, and thus try to commit "just
before" we go over the limit. Makes sense on its face, though it's obviously
only as good as the buffer size increases are regular/linear. So the question
is: will the buffer grow linearly between commits?
Unfortunately I suspect the answer is "no" in many cases -- In fact, the
only case in which the buffer will grow by the same amount each iteration is
when every insert is a new key. Of course, most KV stores have a bounded
keyspace (else they'd grow forever) so we expect to see updates to the same
keys. The very first iteration after a commit will show the most growth, and
that will taper off as we rebuild the keyset and see fewer and fewer new keys
vs in-place updates.
Of course it's not incorrect to commit early, I just wanted to get your
thoughts on this. Maybe a good middle ground would be to maintain a running
average `deltaBytes`? Or even just cut the previous `deltaBytes` in half?
Either way it's still just an approximation, and things like
maxUncommittedBytes are always just best-effort in the end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]