cadonna commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1474135551


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1343,6 +1348,17 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
         return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
     }
 
+    private long getMaxUncommittedBytesPerThread(final int numStreamThreads) {

Review Comment:
   This method does not consider whether `default.state.isolation.level` is set 
to `READ_COMMITTED`. Of course before we do not have 
`default.state.isolation.level`, we cannot consider it. 
   Does passing `maxUncommittedBytesPerThread` already affect commit behavior? 
As far as I understand the code it does not because 
`approximateNumUncommittedBytes()` returns always 0 at the moment.
   Would it be better to first introduce `default.state.isolation.level` before 
we go on with this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,30 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
         }
     }
 
+    boolean needsCommit(final boolean updateDelta) {
+        final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;
+        if (transactionBuffersAreUnbounded) {
+            return false;
+        }
+
+        // force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold
+        final long uncommittedBytes = tasks.approximateUncommittedStateBytes();
+
+        final long deltaBytes = Math.max(0, uncommittedBytes - 
lastUncommittedBytes);
+
+        final boolean needsCommit = uncommittedBytes + deltaBytes > 
maxUncommittedStateBytes;
+        if (needsCommit) {
+            log.debug(
+                    "Needs commit because we will exceed max uncommitted bytes 
before next commit. max: {}, last: {}, current: {}, delta: {}",
+                    maxUncommittedStateBytes, lastUncommittedBytes, 
uncommittedBytes, deltaBytes
+            );

Review Comment:
   nit:
   ```suggestion
               log.debug(
                   "Needs commit because we will exceed max uncommitted bytes 
before next commit. max: {}, last: {}, current: {}, delta: {}",
                   maxUncommittedStateBytes, lastUncommittedBytes, 
uncommittedBytes, deltaBytes
               );
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final 
Collection<Task> tasksToCo
             if (task.commitNeeded()) {
                 task.clearTaskTimeout();
                 ++committed;
-                task.postCommit(false);
+                // under EOS, we need to enforce a checkpoint if our 
transaction buffers will exceeded their capacity

Review Comment:
   I am also missing unit tests for this case.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java:
##########
@@ -152,6 +153,13 @@ public List<S> allSegments(final boolean forward) {
         return result;
     }
 
+    @Override
+    public long approximateNumUncommittedBytes() {

Review Comment:
   I could not find unit tests for this.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1320,6 +1320,19 @@ public void 
shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
         assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
     }
 
+    @Test
+    public void testStateStoreMaxUncommittedBytesShouldAllowUnbounded() {
+        props.put(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG, -1);
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals(Long.valueOf(-1), 
config.getLong(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG));
+    }
+
+    @Test
+    public void 
shouldUseDefaultStateStoreMaxUncommittedBytesConfigWhenNoConfigIsSet() {
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals(Long.valueOf(64 * 1024 * 1024), 
config.getLong(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG));
+    }
+

Review Comment:
   I think you need at least two more tests here. One that tests that you can 
set positive values and one that verifies that an exception is thrown when you 
set a value less than -1.
   
   I am wondering if we should allow 0. At the moment, it does not make sense 
to me to set this config to 0. Maybe you should also write a test with the 
expected behavior for 0. In the end unit tests should also document the code.   



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########


Review Comment:
   There are ways to remember. You could tie a knot in your handkerchief or you 
could create a sub-ticket under  KAFKA-14412. I leave it to you what option you 
use. 🙂  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1374,7 +1376,7 @@ public void signalResume() {
      */
     int maybeCommit() {
         final int committed;
-        if (now - lastCommitMs > commitTimeMs) {
+        if (taskManager.needsCommit(true) || now - lastCommitMs > 
commitTimeMs) {

Review Comment:
   We need both tests. The one in `StreamThreadTest` verifies that the commit 
is triggered or not depending on the value returned from `needsCommit()` and 
the one in `TaskManagerTest` verifies that `needCommit()` returns the expected 
boolean value.
   
   Apropos `TaskManagerTest`, I missing a unit test for `needsCommit()` there. 
Good that you are already working on it. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -404,4 +404,11 @@ public synchronized Map<TaskId, Task> allTasksPerId() {
     public boolean contains(final TaskId taskId) {
         return getTask(taskId) != null;
     }
+
+    @Override
+    public long approximateUncommittedStateBytes() {

Review Comment:
   I could not find unit tests for this.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1073,6 +1078,35 @@ int commit(final Collection<Task> tasksToCommit) {
         assertTrue(committed.get());
     }
 
+    @Test
+    public void shouldCommitEarlyIfNeeded() {
+        final long commitInterval = 1000L;
+        final Properties props = configProps(false);
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Long.toString(commitInterval));
+
+        final StreamsConfig config = new StreamsConfig(props);
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final Task runningTask = mock(Task.class);
+        final TaskManager taskManager = mockTaskManagerCommit(runningTask, 2);
+        when(taskManager.needsCommit(anyBoolean())).thenReturn(false);
+
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata);
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
+
+        when(taskManager.needsCommit(anyBoolean())).thenReturn(true);
+        mockTime.sleep(commitInterval - 10L);
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
+
+        verify(taskManager, times(2)).commit(mkSet(runningTask));

Review Comment:
   I was wondering why the verification expects 2 invocations. I then 
understood that when `needsCommit()` returns `false` the commit is done because 
the second condition of the `if`-statement is `true` (i.e. it is the first 
commit). The second commit happens because `needsCommit()` returns `true`, but 
the second condition is `false`. 
   This setup is not ideal because it does not test whether `needsCommit()` 
avoids an early commit when it returns `false`.  So you need a third call to 
`maybeCommit()`. The first call would just make the first commit. The second 
call is within the commit interval (i.e., the second condition is `false`) and 
`needsCommit()` returns `false` and the third call is within the commit 
interval and `needsCommit()` returns `true`. You should also verify the call to 
`taskManager.commit()` after each call to `thread.maybeCommit()` to be sure 
that the call was triggered or not for the correct case.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -874,6 +879,12 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0),
                     Importance.MEDIUM,
                     STATESTORE_CACHE_MAX_BYTES_DOC)
+            .define(STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG,
+                    Type.LONG,
+                    64 * 1024 * 1024L,
+                    atLeast(-1),
+                    Importance.MEDIUM,
+                    STATESTORE_UNCOMMITTED_MAX_BYTES_DOC)

Review Comment:
   Added a comment to `StreamsConfigTest`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final 
Collection<Task> tasksToCo
             if (task.commitNeeded()) {
                 task.clearTaskTimeout();
                 ++committed;
-                task.postCommit(false);
+                // under EOS, we need to enforce a checkpoint if our 
transaction buffers will exceeded their capacity

Review Comment:
   Ah, OK! Got it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to