lucasbru commented on code in PR #20544:
URL: https://github.com/apache/kafka/pull/20544#discussion_r2358251606
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4248,28 +4215,32 @@ public boolean maybePunctuateStreamTime() {
@Test
public void shouldPunctuateActiveTasks() {
Review Comment:
this test seems to be testing what it should. question is just whether it
can be simplified (see below)
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4248,28 +4215,32 @@ public boolean maybePunctuateStreamTime() {
@Test
public void shouldPunctuateActiveTasks() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
- @Override
- public boolean maybePunctuateStreamTime() {
- return true;
- }
- @Override
- public boolean maybePunctuateSystemTime() {
- return true;
- }
- };
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(task00.maybePunctuateStreamTime()).thenReturn(true);
+ when(task00.maybePunctuateSystemTime()).thenReturn(true);
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.activeTasks()).thenReturn(Set.of(task00));
- assertThat(task00.state(), is(Task.State.RUNNING));
+ when(stateUpdater.restoresActiveTasks()).thenReturn(false);
+ when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(false);
+
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of());
+
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of());
+
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+
+ assertTrue(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
Review Comment:
For my understanding - why do we actually need to call `checkStateUpdater`
here?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -261,7 +279,7 @@ public void
shouldLockAllTasksOnCorruptionWithProcessingThreads() {
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Review Comment:
Honestly. these changes to `setUpTaskManager` are quite confusing and I
don't understand why you did it.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2280,39 +2282,34 @@ public void suspend() {
}
@Test
- public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
- final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
-
- final StateMachineTask corruptedTask = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- final StateMachineTask nonCorruptedTask = new
StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
+ public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() {
Review Comment:
No, I don't think I agree with this
The key for this test is that non-corrupted tasks are still committed as
usual, the the offsets for the corrupted tasks are reset.
```
assertTrue(nonCorruptedTask.commitPrepared);
assertThat(nonCorruptedTask.partitionsForOffsetReset,
equalTo(Collections.emptySet()));
assertThat(corruptedTask.partitionsForOffsetReset,
equalTo(taskId00Partitions));
// check that we should not commit empty map either
verify(consumer, never()).commitSync(emptyMap());
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
```
This is still a valid test!
But maybe we can skip the handle Assignment / complete restoration part if
we immediatelly mock a RUNNING task?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]