cadonna commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1471044772


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -874,6 +879,12 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0),
                     Importance.MEDIUM,
                     STATESTORE_CACHE_MAX_BYTES_DOC)
+            .define(STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG,
+                    Type.LONG,
+                    64 * 1024 * 1024L,
+                    atLeast(-1),
+                    Importance.MEDIUM,
+                    STATESTORE_UNCOMMITTED_MAX_BYTES_DOC)

Review Comment:
   Could you please add unit tests for this config in `StreamsConfigTest`? 



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########


Review Comment:
   What do you think about moving the `STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG` 
to the `InternalConfig` and not exposing this config in the public API for now? 
Then, when we have a complete implementation, we can expose it publicly. 
   This would avoid exposing unfinished work.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -141,6 +141,7 @@ private int processTask(final Task task, final int 
maxNumRecords, final long beg
     int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> 
tasksToCommit,
                                                     final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
         int committed = 0;
+        final boolean enfoceCheckpoint = taskManager.needsCommit(false);

Review Comment:
   typo:
   ```suggestion
           final boolean enforceCheckpoint = taskManager.needsCommit(false);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final 
Collection<Task> tasksToCo
             if (task.commitNeeded()) {
                 task.clearTaskTimeout();
                 ++committed;
-                task.postCommit(false);
+                // under EOS, we need to enforce a checkpoint if our 
transaction buffers will exceeded their capacity

Review Comment:
   Shouldn't our transaction buffers be empty at this point?
   We committed the transaction which means we should have also committed the 
state store.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1374,7 +1376,7 @@ public void signalResume() {
      */
     int maybeCommit() {
         final int committed;
-        if (now - lastCommitMs > commitTimeMs) {
+        if (taskManager.needsCommit(true) || now - lastCommitMs > 
commitTimeMs) {

Review Comment:
   I could also not find a test code for this.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -438,4 +438,11 @@ public Map<TopicPartition, Long> changelogOffsets() {
     public final String changelogFor(final String storeName) {
         return storeToChangelogTopic.get(storeName);
     }
+
+    @Override
+    public long approximateNumUncommittedBytes() {
+        return globalStores.values().stream()
+            .map(optional -> 
optional.map(StateStore::approximateNumUncommittedBytes).orElse(0L))
+            .reduce(0L, Long::sum);
+    }

Review Comment:
   I cannot find a unit test for this.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    // track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+    private long lastUncommittedBytes = 0L;
+
+    boolean needsCommit(final boolean updateDelta) {
+        if (maxUncommittedStateBytes < 0) {
+            // if our transaction buffers are unbounded, we never need to 
force an early commit
+            return false;
+        }
+
+        // force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold
+        final long uncommittedBytes = tasks.approximateUncommittedStateBytes();
+
+        final long deltaBytes = Math.max(0, uncommittedBytes - 
lastUncommittedBytes);
+
+        final boolean needsCommit =  maxUncommittedStateBytes > -1 && 
uncommittedBytes + deltaBytes > maxUncommittedStateBytes;

Review Comment:
   Additionally, nit:
   ```suggestion
           final boolean needsCommit = maxUncommittedStateBytes > -1 && 
uncommittedBytes + deltaBytes > maxUncommittedStateBytes;
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    // track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+    private long lastUncommittedBytes = 0L;
+
+    boolean needsCommit(final boolean updateDelta) {
+        if (maxUncommittedStateBytes < 0) {
+            // if our transaction buffers are unbounded, we never need to 
force an early commit
+            return false;
+        }
+
+        // force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold
+        final long uncommittedBytes = tasks.approximateUncommittedStateBytes();
+
+        final long deltaBytes = Math.max(0, uncommittedBytes - 
lastUncommittedBytes);
+
+        final boolean needsCommit =  maxUncommittedStateBytes > -1 && 
uncommittedBytes + deltaBytes > maxUncommittedStateBytes;

Review Comment:
   Do we really need `maxUncommittedStateBytes > -1`? Isn't this always true 
once we pass line 1851? 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    // track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+    private long lastUncommittedBytes = 0L;
+
+    boolean needsCommit(final boolean updateDelta) {
+        if (maxUncommittedStateBytes < 0) {
+            // if our transaction buffers are unbounded, we never need to 
force an early commit
+            return false;
+        }

Review Comment:
   nit:
   ```suggestion
           final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;
           if (transactionBuffersAreUnbounded) {
               return false;
           }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final 
Collection<Task> tasksToCo
             if (task.commitNeeded()) {
                 task.clearTaskTimeout();
                 ++committed;
-                task.postCommit(false);
+                // under EOS, we need to enforce a checkpoint if our 
transaction buffers will exceeded their capacity
+                task.postCommit(enfoceCheckpoint);

Review Comment:
   I could not find tests for this change.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1097,10 +1102,10 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             for (final Task task : commitNeededActiveTasks) {
                 if (!dirtyTasks.contains(task)) {
                     try {
-                        // for non-revoking active tasks, we should not 
enforce checkpoint
-                        // since if it is EOS enabled, no checkpoint should be 
written while
-                        // the task is in RUNNING tate
-                        task.postCommit(false);
+                        // we only enforce a checkpoint if the transaction 
buffers are full
+                        // to avoid unnecessary flushing of stores under EOS
+                        final boolean enforceCheckpoint = 
maxUncommittedStateBytes > -1 && tasks.approximateUncommittedStateBytes() >= 
maxUncommittedStateBytes;
+                        task.postCommit(enforceCheckpoint);

Review Comment:
   Also here, the tasks should have been committed. Why are transaction buffers 
not empty?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1841,33 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    // track the size of the transaction buffer on each iteration to predict 
when it will be exceeded in advance
+    private long lastUncommittedBytes = 0L;

Review Comment:
   Please move this field to the other fields at the top of the class.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -754,4 +754,11 @@ public void deleteCheckPointFileIfEOSEnabled() throws 
IOException {
             checkpointFile.delete();
         }
     }
+
+    @Override
+    public long approximateNumUncommittedBytes() {
+        return stores.values().stream()
+                .map(metadata -> 
metadata.store().approximateNumUncommittedBytes())
+                .reduce(0L, Long::sum);
+    }

Review Comment:
   I cannot find a unit test for this.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -505,6 +505,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = 
"statestore.cache.max.bytes";
     public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum 
number of memory bytes to be used for statestore cache across all threads";
 
+    public static final String STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG = 
"statestore.uncommitted.max.bytes";
+    public static final String STATESTORE_UNCOMMITTED_MAX_BYTES_DOC = "Maximum 
number of memory bytes to be used to buffer uncommitted state-store records. " +
+            "If this limit is exceeded, a task commit will be requested. No 
limit: -1. " +
+            "Note: if this is too high or unbounded, it's possible for RocksDB 
to trigger out-of-memory errors.";

Review Comment:
   I would not explicitly mention RocksDB here since this config is applicable 
to any state store that supports transactions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to