ableegoldman commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1483722072


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    private boolean transactionBuffersExceedCapacity = false;
+
+    boolean transactionBuffersExceedCapacity() {
+        return transactionBuffersExceedCapacity;
+    }
+
+    boolean transactionBuffersWillExceedCapacity() {
+        final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;

Review Comment:
   nit: avoid implicit assumptions about the value of sentinel constants. For 
example what if we need to introduce another sentinel value that means 
something other than unbounded, and give it a value of -2? Best to define a 
static constant like `UNBOUNDED_UNCOMMITTED_BYTES = -1` and then check like 
this:
   ```suggestion
           final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes == UNBOUNDED_UNCOMMITTED_BYTES;
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1889,7 +1925,8 @@ private int 
commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> t
         if (rebalanceInProgress) {
             return -1;
         } else {
-            return 
taskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, 
consumedOffsetsAndMetadata);
+            final int committedOffsets = 
taskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, 
consumedOffsetsAndMetadata);

Review Comment:
   was this change intentional?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    private boolean transactionBuffersExceedCapacity = false;

Review Comment:
   nit: move this with the other fields up at the top



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    private boolean transactionBuffersExceedCapacity = false;
+
+    boolean transactionBuffersExceedCapacity() {
+        return transactionBuffersExceedCapacity;
+    }
+
+    boolean transactionBuffersWillExceedCapacity() {
+        final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;

Review Comment:
   And if you define the UNBOUNDED_UNCOMMITTED_BYTES constant in StreamsConfig, 
perhaps next to the config definition itself, then users can easily discover 
and use the variable for clear code, rather than hardcoding "-1" and then 
coming back a year later and wondering what that value means



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,36 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    private boolean transactionBuffersExceedCapacity = false;
+
+    boolean transactionBuffersExceedCapacity() {
+        return transactionBuffersExceedCapacity;
+    }
+
+    boolean transactionBuffersWillExceedCapacity() {
+        final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;
+        if (transactionBuffersAreUnbounded) {
+            return false;
+        }
+
+        // force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold

Review Comment:
   I think this method would benefit from slightly more detailed docs/comments. 
I'm also a bit concerned about over-optimization, but lmk what you think:
   
   First, IIUC, the idea is to use the number of additional uncommitted bytes 
since the last commit as a heuristic to estimate how many more uncommitted 
bytes will be added if we don't commit right now, and thus try to commit "just 
before" we go over the limit. Makes sense on its face, though it's obviously 
only as good as the buffer size increases are regular/linear. So the question 
is: will the buffer grow linearly between commits?
   
   Unfortunately I suspect the answer is "no" in many cases -- In fact, the 
only case in which the buffer will grow by the same amount each iteration is 
when every insert is a new key. Of course, most KV stores have a bounded 
keyspace (else they'd grow forever) so we expect to see updates to the same 
keys. The very first iteration after a commit will show the most growth, and 
that will taper off as we rebuild the keyset and see fewer and fewer new keys 
vs in-place updates.
   
   Of course it's not incorrect to commit early, I just wanted to get your 
thoughts on this. Maybe a good middle ground would be to maintain a running 
average `deltaBytes`? Or even just cut the previous `deltaBytes` in half? 
Either way it's still just an approximation, and things like 
maxUncommittedBytes are always just best-effort in the end. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to