cadonna commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1474135551
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1343,6 +1348,17 @@ private long getCacheSizePerThread(final int numStreamThreads) { return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0)); } + private long getMaxUncommittedBytesPerThread(final int numStreamThreads) { Review Comment: This method does not consider whether `default.state.isolation.level` is set to `READ_COMMITTED`. Of course before we do not have `default.state.isolation.level`, we cannot consider it. Does passing `maxUncommittedBytesPerThread` already affect commit behavior? As far as I understand the code it does not because `approximateNumUncommittedBytes()` returns always 0 at the moment. Would it be better to first introduce `default.state.isolation.level` before we go on with this? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1836,6 +1842,30 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } + boolean needsCommit(final boolean updateDelta) { + final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; + if (transactionBuffersAreUnbounded) { + return false; + } + + // force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold + final long uncommittedBytes = tasks.approximateUncommittedStateBytes(); + + final long deltaBytes = Math.max(0, uncommittedBytes - lastUncommittedBytes); + + final boolean needsCommit = uncommittedBytes + deltaBytes > maxUncommittedStateBytes; + if (needsCommit) { + log.debug( + "Needs commit because we will exceed max uncommitted bytes before next commit. max: {}, last: {}, current: {}, delta: {}", + maxUncommittedStateBytes, lastUncommittedBytes, uncommittedBytes, deltaBytes + ); Review Comment: nit: ```suggestion log.debug( "Needs commit because we will exceed max uncommitted bytes before next commit. max: {}, last: {}, current: {}, delta: {}", maxUncommittedStateBytes, lastUncommittedBytes, uncommittedBytes, deltaBytes ); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ########## @@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCo if (task.commitNeeded()) { task.clearTaskTimeout(); ++committed; - task.postCommit(false); + // under EOS, we need to enforce a checkpoint if our transaction buffers will exceeded their capacity Review Comment: I am also missing unit tests for this case. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java: ########## @@ -152,6 +153,13 @@ public List<S> allSegments(final boolean forward) { return result; } + @Override + public long approximateNumUncommittedBytes() { Review Comment: I could not find unit tests for this. ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1320,6 +1320,19 @@ public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() { assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024); } + @Test + public void testStateStoreMaxUncommittedBytesShouldAllowUnbounded() { + props.put(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG, -1); + final StreamsConfig config = new StreamsConfig(props); + assertEquals(Long.valueOf(-1), config.getLong(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG)); + } + + @Test + public void shouldUseDefaultStateStoreMaxUncommittedBytesConfigWhenNoConfigIsSet() { + final StreamsConfig config = new StreamsConfig(props); + assertEquals(Long.valueOf(64 * 1024 * 1024), config.getLong(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG)); + } + Review Comment: I think you need at least two more tests here. One that tests that you can set positive values and one that verifies that an exception is thrown when you set a value less than -1. I am wondering if we should allow 0. At the moment, it does not make sense to me to set this config to 0. Maybe you should also write a test with the expected behavior for 0. In the end unit tests should also document the code. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## Review Comment: There are ways to remember. You could tie a knot in your handkerchief or you could create a sub-ticket under KAFKA-14412. I leave it to you what option you use. 🙂 ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1374,7 +1376,7 @@ public void signalResume() { */ int maybeCommit() { final int committed; - if (now - lastCommitMs > commitTimeMs) { + if (taskManager.needsCommit(true) || now - lastCommitMs > commitTimeMs) { Review Comment: We need both tests. The one in `StreamThreadTest` verifies that the commit is triggered or not depending on the value returned from `needsCommit()` and the one in `TaskManagerTest` verifies that `needCommit()` returns the expected boolean value. Apropos `TaskManagerTest`, I missing a unit test for `needsCommit()` there. Good that you are already working on it. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ########## @@ -404,4 +404,11 @@ public synchronized Map<TaskId, Task> allTasksPerId() { public boolean contains(final TaskId taskId) { return getTask(taskId) != null; } + + @Override + public long approximateUncommittedStateBytes() { Review Comment: I could not find unit tests for this. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1073,6 +1078,35 @@ int commit(final Collection<Task> tasksToCommit) { assertTrue(committed.get()); } + @Test + public void shouldCommitEarlyIfNeeded() { + final long commitInterval = 1000L; + final Properties props = configProps(false); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); + + final StreamsConfig config = new StreamsConfig(props); + final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final Task runningTask = mock(Task.class); + final TaskManager taskManager = mockTaskManagerCommit(runningTask, 2); + when(taskManager.needsCommit(anyBoolean())).thenReturn(false); + + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); + thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); + + when(taskManager.needsCommit(anyBoolean())).thenReturn(true); + mockTime.sleep(commitInterval - 10L); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); + + verify(taskManager, times(2)).commit(mkSet(runningTask)); Review Comment: I was wondering why the verification expects 2 invocations. I then understood that when `needsCommit()` returns `false` the commit is done because the second condition of the `if`-statement is `true` (i.e. it is the first commit). The second commit happens because `needsCommit()` returns `true`, but the second condition is `false`. This setup is not ideal because it does not test whether `needsCommit()` avoids an early commit when it returns `false`. So you need a third call to `maybeCommit()`. The first call would just make the first commit. The second call is within the commit interval (i.e., the second condition is `false`) and `needsCommit()` returns `false` and the third call is within the commit interval and `needsCommit()` returns `true`. You should also verify the call to `taskManager.commit()` after each call to `thread.maybeCommit()` to be sure that the call was triggered or not for the correct case. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -874,6 +879,12 @@ public class StreamsConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, STATESTORE_CACHE_MAX_BYTES_DOC) + .define(STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG, + Type.LONG, + 64 * 1024 * 1024L, + atLeast(-1), + Importance.MEDIUM, + STATESTORE_UNCOMMITTED_MAX_BYTES_DOC) Review Comment: Added a comment to `StreamsConfigTest`. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ########## @@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCo if (task.commitNeeded()) { task.clearTaskTimeout(); ++committed; - task.postCommit(false); + // under EOS, we need to enforce a checkpoint if our transaction buffers will exceeded their capacity Review Comment: Ah, OK! Got it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org