cadonna commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1474135551
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1343,6 +1348,17 @@ private long getCacheSizePerThread(final int
numStreamThreads) {
return totalCacheSize / (numStreamThreads +
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
}
+ private long getMaxUncommittedBytesPerThread(final int numStreamThreads) {
Review Comment:
This method does not consider whether `default.state.isolation.level` is set
to `READ_COMMITTED`. Of course before we do not have
`default.state.isolation.level`, we cannot consider it.
Does passing `maxUncommittedBytesPerThread` already affect commit behavior?
As far as I understand the code it does not because
`approximateNumUncommittedBytes()` returns always 0 at the moment.
Would it be better to first introduce `default.state.isolation.level` before
we go on with this?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1836,6 +1842,30 @@ public void
maybeThrowTaskExceptionsFromProcessingThreads() {
}
}
+ boolean needsCommit(final boolean updateDelta) {
+ final boolean transactionBuffersAreUnbounded =
maxUncommittedStateBytes < 0;
+ if (transactionBuffersAreUnbounded) {
+ return false;
+ }
+
+ // force an early commit if the uncommitted bytes exceeds or is
*likely to exceed* the configured threshold
+ final long uncommittedBytes = tasks.approximateUncommittedStateBytes();
+
+ final long deltaBytes = Math.max(0, uncommittedBytes -
lastUncommittedBytes);
+
+ final boolean needsCommit = uncommittedBytes + deltaBytes >
maxUncommittedStateBytes;
+ if (needsCommit) {
+ log.debug(
+ "Needs commit because we will exceed max uncommitted bytes
before next commit. max: {}, last: {}, current: {}, delta: {}",
+ maxUncommittedStateBytes, lastUncommittedBytes,
uncommittedBytes, deltaBytes
+ );
Review Comment:
nit:
```suggestion
log.debug(
"Needs commit because we will exceed max uncommitted bytes
before next commit. max: {}, last: {}, current: {}, delta: {}",
maxUncommittedStateBytes, lastUncommittedBytes,
uncommittedBytes, deltaBytes
);
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final
Collection<Task> tasksToCo
if (task.commitNeeded()) {
task.clearTaskTimeout();
++committed;
- task.postCommit(false);
+ // under EOS, we need to enforce a checkpoint if our
transaction buffers will exceeded their capacity
Review Comment:
I am also missing unit tests for this case.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java:
##########
@@ -152,6 +153,13 @@ public List<S> allSegments(final boolean forward) {
return result;
}
+ @Override
+ public long approximateNumUncommittedBytes() {
Review Comment:
I could not find unit tests for this.
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1320,6 +1320,19 @@ public void
shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
}
+ @Test
+ public void testStateStoreMaxUncommittedBytesShouldAllowUnbounded() {
+ props.put(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG, -1);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(Long.valueOf(-1),
config.getLong(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG));
+ }
+
+ @Test
+ public void
shouldUseDefaultStateStoreMaxUncommittedBytesConfigWhenNoConfigIsSet() {
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(Long.valueOf(64 * 1024 * 1024),
config.getLong(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG));
+ }
+
Review Comment:
I think you need at least two more tests here. One that tests that you can
set positive values and one that verifies that an exception is thrown when you
set a value less than -1.
I am wondering if we should allow 0. At the moment, it does not make sense
to me to set this config to 0. Maybe you should also write a test with the
expected behavior for 0. In the end unit tests should also document the code.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
Review Comment:
There are ways to remember. You could tie a knot in your handkerchief or you
could create a sub-ticket under KAFKA-14412. I leave it to you what option you
use. 🙂
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1374,7 +1376,7 @@ public void signalResume() {
*/
int maybeCommit() {
final int committed;
- if (now - lastCommitMs > commitTimeMs) {
+ if (taskManager.needsCommit(true) || now - lastCommitMs >
commitTimeMs) {
Review Comment:
We need both tests. The one in `StreamThreadTest` verifies that the commit
is triggered or not depending on the value returned from `needsCommit()` and
the one in `TaskManagerTest` verifies that `needCommit()` returns the expected
boolean value.
Apropos `TaskManagerTest`, I missing a unit test for `needsCommit()` there.
Good that you are already working on it.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -404,4 +404,11 @@ public synchronized Map<TaskId, Task> allTasksPerId() {
public boolean contains(final TaskId taskId) {
return getTask(taskId) != null;
}
+
+ @Override
+ public long approximateUncommittedStateBytes() {
Review Comment:
I could not find unit tests for this.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1073,6 +1078,35 @@ int commit(final Collection<Task> tasksToCommit) {
assertTrue(committed.get());
}
+ @Test
+ public void shouldCommitEarlyIfNeeded() {
+ final long commitInterval = 1000L;
+ final Properties props = configProps(false);
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
Long.toString(commitInterval));
+
+ final StreamsConfig config = new StreamsConfig(props);
+ final ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final Task runningTask = mock(Task.class);
+ final TaskManager taskManager = mockTaskManagerCommit(runningTask, 2);
+ when(taskManager.needsCommit(anyBoolean())).thenReturn(false);
+
+ final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+ thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata);
+ thread.setNow(mockTime.milliseconds());
+ thread.maybeCommit();
+
+ when(taskManager.needsCommit(anyBoolean())).thenReturn(true);
+ mockTime.sleep(commitInterval - 10L);
+ thread.setNow(mockTime.milliseconds());
+ thread.maybeCommit();
+
+ verify(taskManager, times(2)).commit(mkSet(runningTask));
Review Comment:
I was wondering why the verification expects 2 invocations. I then
understood that when `needsCommit()` returns `false` the commit is done because
the second condition of the `if`-statement is `true` (i.e. it is the first
commit). The second commit happens because `needsCommit()` returns `true`, but
the second condition is `false`.
This setup is not ideal because it does not test whether `needsCommit()`
avoids an early commit when it returns `false`. So you need a third call to
`maybeCommit()`. The first call would just make the first commit. The second
call is within the commit interval (i.e., the second condition is `false`) and
`needsCommit()` returns `false` and the third call is within the commit
interval and `needsCommit()` returns `true`. You should also verify the call to
`taskManager.commit()` after each call to `thread.maybeCommit()` to be sure
that the call was triggered or not for the correct case.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -874,6 +879,12 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
STATESTORE_CACHE_MAX_BYTES_DOC)
+ .define(STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG,
+ Type.LONG,
+ 64 * 1024 * 1024L,
+ atLeast(-1),
+ Importance.MEDIUM,
+ STATESTORE_UNCOMMITTED_MAX_BYTES_DOC)
Review Comment:
Added a comment to `StreamsConfigTest`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -157,7 +158,8 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final
Collection<Task> tasksToCo
if (task.commitNeeded()) {
task.clearTaskTimeout();
++committed;
- task.postCommit(false);
+ // under EOS, we need to enforce a checkpoint if our
transaction buffers will exceeded their capacity
Review Comment:
Ah, OK! Got it!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]