shashankhs11 commented on code in PR #20818:
URL: https://github.com/apache/kafka/pull/20818#discussion_r2488591799
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3041,218 +3040,272 @@ public void
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
@Test
public void shouldCommitAllNeededTasksOnHandleRevocation() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
+ // revoked task that needs commit
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets00);
- task00.setCommitNeeded();
+ when(task00.commitNeeded()).thenReturn(true);
+ when(task00.prepareCommit(true)).thenReturn(offsets00);
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
+ // non revoked task that needs commit
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> offsets01 =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
- task01.setCommittableOffsetsAndMetadata(offsets01);
- task01.setCommitNeeded();
+ when(task01.commitNeeded()).thenReturn(true);
+ when(task01.prepareCommit(true)).thenReturn(offsets01);
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
- final Map<TopicPartition, OffsetAndMetadata> offsets02 =
singletonMap(t1p2, new OffsetAndMetadata(2L, null));
- task02.setCommittableOffsetsAndMetadata(offsets02);
+ // non revoked task that does NOT need commit
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .withInputPartitions(taskId02Partitions)
+ .inState(State.RUNNING)
+ .build();
+ when(task02.commitNeeded()).thenReturn(false);
- final StateMachineTask task10 = new StateMachineTask(taskId10,
taskId10Partitions, false, stateManager);
+ // standby task (not be affected by revocation)
+ final StandbyTask task03 = standbyTask(taskId03,
taskId03ChangelogPartitions)
+ .withInputPartitions(taskId03Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets
= new HashMap<>();
expectedCommittedOffsets.putAll(offsets00);
expectedCommittedOffsets.putAll(offsets01);
- final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions)
- );
-
- final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
- mkEntry(taskId10, taskId10Partitions)
- );
- when(consumer.assignment()).thenReturn(assignment);
-
- when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
- .thenReturn(asList(task00, task01, task02));
- when(standbyTaskCreator.createTasks(assignmentStandby))
- .thenReturn(singletonList(task10));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
- taskManager.handleAssignment(assignmentActive, assignmentStandby);
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
- assertThat(task10.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.commitNeeded, is(false));
- assertThat(task00.commitPrepared, is(true));
- assertThat(task01.commitNeeded, is(false));
- assertThat(task01.commitPrepared, is(true));
- assertThat(task02.commitPrepared, is(false));
- assertThat(task10.commitPrepared, is(false));
+ // both tasks needing commit had prepareCommit called
+ verify(task00).prepareCommit(true);
+ verify(task01).prepareCommit(true);
+ verify(task02, never()).prepareCommit(anyBoolean());
+ verify(task03, never()).prepareCommit(anyBoolean());
verify(consumer).commitSync(expectedCommittedOffsets);
+
+ // revoked task suspended
+ verify(task00).suspend();
+ verify(task00).postCommit(true);
+
+ // non-revoked task with commit was also post-committed (but not
suspended)
+ verify(task01).postCommit(false);
+ verify(task01, never()).suspend();
+
+ // task02 and task03 should not be affected
+ verify(task02, never()).postCommit(anyBoolean());
+ verify(task02, never()).suspend();
+ verify(task03, never()).postCommit(anyBoolean());
+ verify(task03, never()).suspend();
}
@Test
public void shouldNotCommitIfNoRevokedTasksNeedCommitting() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
-
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
- task01.setCommitNeeded();
+ // task00 being revoked, no commit needed
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
+ // task01 NOT being revoked, commit needed
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
- final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions)
- );
+ // task02 NOT being revoked, no commit needed
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .withInputPartitions(taskId02Partitions)
+ .inState(State.RUNNING)
+ .build();
- when(consumer.assignment()).thenReturn(assignment);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
- when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
- .thenReturn(asList(task00, task01, task02));
+ when(task00.commitNeeded()).thenReturn(false);
+ when(task01.commitNeeded()).thenReturn(true); // only task01 needs
commit
+ when(task02.commitNeeded()).thenReturn(false);
- taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.commitPrepared, is(false));
- assertThat(task01.commitPrepared, is(false));
- assertThat(task02.commitPrepared, is(false));
+ verify(task00, never()).prepareCommit(anyBoolean());
+ verify(task01, never()).prepareCommit(anyBoolean());
+ verify(task02, never()).prepareCommit(anyBoolean());
+
+ verify(task00).suspend();
+ verify(task01, never()).suspend();
+ verify(task02, never()).suspend();
}
@Test
public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
Review Comment:
This test tests the same behaviour as
`shouldNotCommitIfNoRevokedTasksNeedCommitting`.
I wasn't able to understand/figure out on how the behavior varies through
the different processing modes.
Just for my understanding, it would differ only if the revoked tasks needs
committing right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]