cadonna commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1471044772
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -874,6 +879,12 @@ public class StreamsConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, STATESTORE_CACHE_MAX_BYTES_DOC) + .define(STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG, + Type.LONG, + 64 * 1024 * 1024L, + atLeast(-1), + Importance.MEDIUM, + STATESTORE_UNCOMMITTED_MAX_BYTES_DOC) Review Comment: Could you please add unit tests for this config in `StreamsConfigTest`? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## Review Comment: What do you think about moving the `STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG` to the `InternalConfig` and not exposing this config in the public API for now? Then, when we have a complete implementation, we can expose it publicly. This would avoid exposing unfinished work. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ########## @@ -141,6 +141,7 @@ private int processTask(final Task task, final int maxNumRecords, final long beg int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit, final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) { int committed = 0; + final boolean enfoceCheckpoint = taskManager.needsCommit(false); Review Comment: typo: ```suggestion final boolean enforceCheckpoint = taskManager.needsCommit(false); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ########## @@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCo if (task.commitNeeded()) { task.clearTaskTimeout(); ++committed; - task.postCommit(false); + // under EOS, we need to enforce a checkpoint if our transaction buffers will exceeded their capacity Review Comment: Shouldn't our transaction buffers be empty at this point? We committed the transaction which means we should have also committed the state store. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1374,7 +1376,7 @@ public void signalResume() { */ int maybeCommit() { final int committed; - if (now - lastCommitMs > commitTimeMs) { + if (taskManager.needsCommit(true) || now - lastCommitMs > commitTimeMs) { Review Comment: I could also not find a test code for this. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java: ########## @@ -438,4 +438,11 @@ public Map<TopicPartition, Long> changelogOffsets() { public final String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + @Override + public long approximateNumUncommittedBytes() { + return globalStores.values().stream() + .map(optional -> optional.map(StateStore::approximateNumUncommittedBytes).orElse(0L)) + .reduce(0L, Long::sum); + } Review Comment: I cannot find a unit test for this. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + // track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance + private long lastUncommittedBytes = 0L; + + boolean needsCommit(final boolean updateDelta) { + if (maxUncommittedStateBytes < 0) { + // if our transaction buffers are unbounded, we never need to force an early commit + return false; + } + + // force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold + final long uncommittedBytes = tasks.approximateUncommittedStateBytes(); + + final long deltaBytes = Math.max(0, uncommittedBytes - lastUncommittedBytes); + + final boolean needsCommit = maxUncommittedStateBytes > -1 && uncommittedBytes + deltaBytes > maxUncommittedStateBytes; Review Comment: Additionally, nit: ```suggestion final boolean needsCommit = maxUncommittedStateBytes > -1 && uncommittedBytes + deltaBytes > maxUncommittedStateBytes; ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + // track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance + private long lastUncommittedBytes = 0L; + + boolean needsCommit(final boolean updateDelta) { + if (maxUncommittedStateBytes < 0) { + // if our transaction buffers are unbounded, we never need to force an early commit + return false; + } + + // force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold + final long uncommittedBytes = tasks.approximateUncommittedStateBytes(); + + final long deltaBytes = Math.max(0, uncommittedBytes - lastUncommittedBytes); + + final boolean needsCommit = maxUncommittedStateBytes > -1 && uncommittedBytes + deltaBytes > maxUncommittedStateBytes; Review Comment: Do we really need `maxUncommittedStateBytes > -1`? Isn't this always true once we pass line 1851? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + // track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance + private long lastUncommittedBytes = 0L; + + boolean needsCommit(final boolean updateDelta) { + if (maxUncommittedStateBytes < 0) { + // if our transaction buffers are unbounded, we never need to force an early commit + return false; + } Review Comment: nit: ```suggestion final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; if (transactionBuffersAreUnbounded) { return false; } ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ########## @@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCo if (task.commitNeeded()) { task.clearTaskTimeout(); ++committed; - task.postCommit(false); + // under EOS, we need to enforce a checkpoint if our transaction buffers will exceeded their capacity + task.postCommit(enfoceCheckpoint); Review Comment: I could not find tests for this change. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1097,10 +1102,10 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { for (final Task task : commitNeededActiveTasks) { if (!dirtyTasks.contains(task)) { try { - // for non-revoking active tasks, we should not enforce checkpoint - // since if it is EOS enabled, no checkpoint should be written while - // the task is in RUNNING tate - task.postCommit(false); + // we only enforce a checkpoint if the transaction buffers are full + // to avoid unnecessary flushing of stores under EOS + final boolean enforceCheckpoint = maxUncommittedStateBytes > -1 && tasks.approximateUncommittedStateBytes() >= maxUncommittedStateBytes; + task.postCommit(enforceCheckpoint); Review Comment: Also here, the tasks should have been committed. Why are transaction buffers not empty? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1841,33 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + // track the size of the transaction buffer on each iteration to predict when it will be exceeded in advance + private long lastUncommittedBytes = 0L; Review Comment: Please move this field to the other fields at the top of the class. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ########## @@ -754,4 +754,11 @@ public void deleteCheckPointFileIfEOSEnabled() throws IOException { checkpointFile.delete(); } } + + @Override + public long approximateNumUncommittedBytes() { + return stores.values().stream() + .map(metadata -> metadata.store().approximateNumUncommittedBytes()) + .reduce(0L, Long::sum); + } Review Comment: I cannot find a unit test for this. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -505,6 +505,11 @@ public class StreamsConfig extends AbstractConfig { public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes"; public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads"; + public static final String STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG = "statestore.uncommitted.max.bytes"; + public static final String STATESTORE_UNCOMMITTED_MAX_BYTES_DOC = "Maximum number of memory bytes to be used to buffer uncommitted state-store records. " + + "If this limit is exceeded, a task commit will be requested. No limit: -1. " + + "Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors."; Review Comment: I would not explicitly mention RocksDB here since this config is applicable to any state store that supports transactions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org