cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1241990023


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], 
byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], 
byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }

Review Comment:
   I would inline this function since it became an one-liner.
   
   ```java
   when(consumer.assignment()).thenReturn(singleton(new 
TopicPartition("assignment", 0)));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2034,12 +1993,9 @@ public void 
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         // `handleAssignment`
         when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
             .thenReturn(asList(corruptedTask, nonCorruptedTask));
-        expectRestoreToBeCompleted(consumer);
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        // check that we should not commit empty map either
-        consumer.commitSync(eq(emptyMap()));
-        expectLastCall().andStubThrow(new AssertionError("should not invoke 
commitSync when offset map is empty"));
-        replay(consumer);
+        expectAssignmentToBeCalled(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, 
taskId00Partitions);

Review Comment:
   Same here



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2449,17 +2398,15 @@ public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTask.state(), is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(State.RUNNING));
+        verifyResumeWasCalledWith(consumer, partitions);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2377,6 +2326,7 @@ public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTaskWithCommitNeeded.state(), 
is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(State.RUNNING));
+        verifyResumeWasCalledWith(consumer, partition);

Review Comment:
   Why do you verify? It was not verified in the original code.
   
   I added a couple of this comments below. However, I am not sure whether the 
original author did not want to verify the consumer or if they forgot about it. 
I will leave it to you if you want to keep them or not.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2805,6 +2716,7 @@ public void 
shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
 
         assertThat(task00.commitNeeded, is(true));
         assertThat(task10.commitPrepared, is(false));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2834,14 +2744,14 @@ public void 
shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
         taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
 
         assertThat(task00.commitNeeded, is(true));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();

Review Comment:
   Yeah, that seems weird...



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], 
byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], 
byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> 
consumer) {
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);
+    }
+
+    private static void verifyResumeWasCalledWith(final Consumer<byte[], 
byte[]> consumer, Set<TopicPartition> assignment) {

Review Comment:
   I would inline this function.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1998,10 +1957,10 @@ public void suspend() {
             }
         };
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, 
taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Same as above also here



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2541,6 +2482,7 @@ public void shouldAddNewActiveTasks() {
         assertThat(taskManager.activeTaskMap(), 
Matchers.equalTo(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(consumer).resume(emptySet());

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) 
{
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, 
taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Why do you union a single set? 
   ```suggestion
           when(consumer.assignment()).thenReturn(taskId00Partitions);
   ```
   ... and isn't this a duplicate of line 1923 (except the differing 
partitions).
   ```java
   expectAssignmentToBeCalled(consumer);
   ```
   Could it be that you can remove line 1923?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2469,19 +2416,18 @@ public void 
shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2494,17 +2440,16 @@ public void shouldAddNonResumedSuspendedTasks() {
         assertThat(task01.state(), is(Task.State.RUNNING));
 
         Mockito.verify(activeTaskCreator).createTasks(any(), 
Mockito.eq(emptyMap()));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions
             mkEntry(taskId02, taskId02Partitions)
             );
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
 
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
             .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
 
         final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
+        when(consumer.groupMetadata()).thenReturn(groupMetadata);
 
         doThrow(new 
TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, 
groupMetadata);
 
-        expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
-        replay(consumer, stateManager);

Review Comment:
   Did you miss to replay the state manager?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();
-
-        replay(consumer);
+        when(consumer.groupMetadata()).thenReturn(groupMetadata);
+        doNothing().when(producer).commitTransaction(expectedCommittedOffsets, 
groupMetadata);

Review Comment:
   I think something strange happened here. Not your fault. The producer is 
used like an EasyMock mock but it is specified as a Mockito mock. The producer 
does not do anything in the test as far as I can see.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4034,14 +3913,13 @@ public Set<TopicPartition> changelogPartitions() {
         };
 
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        replay(consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(false));
         assertThat(task00.state(), is(Task.State.RESTORING));
         // this could be a bit mysterious; we're verifying _no_ interactions 
on the consumer,
         // since the taskManager should _not_ resume the assignment while 
we're still in RESTORING
-        verify(consumer);
+        Mockito.verifyNoMoreInteractions(consumer);

Review Comment:
   ```suggestion
           Mockito.verifyNoInteractions(consumer);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2775,6 +2686,8 @@ public void 
shouldCommitAllNeededTasksOnHandleRevocation() {
         assertThat(task01.commitPrepared, is(true));
         assertThat(task02.commitPrepared, is(false));
         assertThat(task10.commitPrepared, is(false));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2629,19 +2558,16 @@ public void shouldSuspendActiveTasksDuringRevocation() {
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        consumer.commitSync(offsets);
-        expectLastCall();
-
-        replay(consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
 
         taskManager.handleRevocation(taskId00Partitions);
         assertThat(task00.state(), is(Task.State.SUSPENDED));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], 
byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], 
byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> 
consumer) {
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);

Review Comment:
   I would inline this function.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to