C0urante commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1014154097


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, 
committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", 
mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = 
ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), 
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), 
eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), 
eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());

Review Comment:
   We want to verify here that at least one `ERROR`-level log message was 
emitted by the `SourceTaskOffsetCommitter` instance during this part of the 
test.
   
   We can use the `LogCaptureAppender` class to do that exact thing, without 
installing a mock logger for the class:
   ```java
           try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
               committer.close(timeoutMs);
               assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("ERROR")));
           }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, 
committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", 
mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = 
ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), 
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), 
eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), 
eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Termination interrupted
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), 
eq(TimeUnit.MILLISECONDS)))
-                .andThrow(new InterruptedException());
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException());
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
+        verify(executor, times(2)).shutdown();
     }
 
     @Test
     public void testRemove() throws Exception {

Review Comment:
   We're losing some coverage guarantees by restructuring the tests like this. 
Right now we know for each sub-case (removing a task successfully, removing a 
cancelled one, and being interrupted while removing one) that we call `cancel` 
and `get` on the future. With the changes here, we do get guarantees that those 
methods are invoked at least once at some point in the `testRemove` method 
(thanks to strict stubbing), but we don't get finer-grained guarantees than 
that. And ensuring that those methods are invoked exactly when we expect is 
fairly valuable, so it'd be nice if we could keep that coverage.
   
   I think we can break this test case out into four separate cases:
   1. Removing a non-existent task
   2. Removing a task successfully
   3. Removing a task that gets cancelled
   4. Removing a task and getting interrupted while doing so
   
   And we can use a helper method to reduce duplication for the last three:
   ```java
   private void testRemove() {
       // Try to remove an existing task
       when(taskFuture.cancel(false)).thenReturn(false);
       when(taskFuture.isDone()).thenReturn(false);
       when(taskId.connector()).thenReturn("MyConnector");
       when(taskId.task()).thenReturn(1);
   
       committers.put(taskId, taskFuture);
       committer.remove(taskId);
   }
   ```
   
   Which would make each test case fairly brief. For example, case 2 might look 
like this:
   
   ```java
   @Test
   public void testRemoveSuccess() throws Exception {
       when(taskFuture.get()).thenReturn(null);
       testRemove();
       assertEquals(Collections.emptyMap(), committers);
   }
   ```
   
   This way, after each test case completes, strict stubbing will ensure that 
the expected methods were invoked on the future at least once. We could go 
further and explicitly verify that they were invoked exactly once, but up to 
you if you'd like to add that coverage or not.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, 
committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", 
mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = 
ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), 
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), 
eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), 
eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);

Review Comment:
   Can we add a `verify(executor, times(1)).shutdown();` after this line, to 
ensure that the executor has actually been shut down by this point?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, 
committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", 
mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = 
ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), 
eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), 
eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), 
eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Termination interrupted
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), 
eq(TimeUnit.MILLISECONDS)))
-                .andThrow(new InterruptedException());
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException());
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
+        verify(executor, times(2)).shutdown();
     }
 
     @Test
     public void testRemove() throws Exception {
         // Try to remove a non-existing task
-        PowerMock.replayAll();
-
         assertTrue(committers.isEmpty());
         committer.remove(taskId);
         assertTrue(committers.isEmpty());
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Try to remove an existing task
-        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(taskFuture.isDone()).andReturn(false);
-        EasyMock.expect(taskFuture.get()).andReturn(null);
-        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
-        EasyMock.expect(taskId.task()).andReturn(1);
-        PowerMock.replayAll();
+        when(taskFuture.cancel(false)).thenReturn(false);
+        when(taskFuture.isDone()).thenReturn(false);
+        when(taskFuture.get())
+                .thenReturn(null)
+                .thenThrow(new CancellationException())
+                .thenThrow(new InterruptedException());
+        when(taskId.connector()).thenReturn("MyConnector");
+        when(taskId.task()).thenReturn(1);
 
         committers.put(taskId, taskFuture);
         committer.remove(taskId);
         assertTrue(committers.isEmpty());
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Try to remove a cancelled task
-        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(taskFuture.isDone()).andReturn(false);
-        EasyMock.expect(taskFuture.get()).andThrow(new 
CancellationException());
-        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
-        EasyMock.expect(taskId.task()).andReturn(1);
-        mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());

Review Comment:
   We can also use the `LogCaptureAppender` class here. Will probably have to 
invoke 
`LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);`



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, 
committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", 
mockLog);

Review Comment:
   I'm not a fan of using Whitebox to mess around with private fields, but we 
should find a way to preserve the coverage guarantees that we get from these 
tests w/r/t logging. Left a comment below with a suggestion on how we can do 
that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to