C0urante commented on code in PR #12615:
URL: https://github.com/apache/kafka/pull/12615#discussion_r969819802


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java:
##########
@@ -78,155 +74,80 @@ public void tearDown() {
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
-        workerTask.initialize(TASK_CONFIG);
-        expectLastCall();
-
-        workerTask.initializeAndStart();
-        expectLastCall();
-
-        workerTask.execute();
-        expectLastCall();
-
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(workerTask);
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.run();
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
-        verify(workerTask);
+        verify(statusListener).onStartup(eq(taskId));
+        verify(statusListener).onShutdown(eq(taskId));
     }
 
     @Test
     public void stopBeforeStarting() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.close();
-        EasyMock.expectLastCall();
+            @Override
+            public void initializeAndStart() {
+                fail("This method is expected to not be invoked");
+            }
 
-        replay(workerTask);
+            @Override
+            public void execute() {
+                fail("This method is expected to not be invoked");
+            }
+        };
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
         // now run should not do anything
         workerTask.run();
-
-        verify(workerTask);
     }
 
     @Test
     public void cancelBeforeStopping() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
-
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
         final CountDownLatch stopped = new CountDownLatch(1);
-        final Thread thread = new Thread(() -> {
-            try {
-                stopped.await();
-            } catch (Exception e) {
-            }
-        });
-
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.initializeAndStart();
-        EasyMock.expectLastCall();
-
-        workerTask.execute();
-        expectLastCall().andAnswer(() -> {
-            thread.start();
-            return null;
-        });
 
-        statusListener.onStartup(taskId);
-        expectLastCall();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.close();
-        expectLastCall();
-
-        // there should be no call to onShutdown()
+            @Override
+            public void execute() {
+                try {
+                    stopped.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();

Review Comment:
   We should never get interrupted during the test; probably safer to fail here:
   ```suggestion
                       fail("Unexpectedly interrupted during test");
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java:
##########
@@ -78,155 +74,80 @@ public void tearDown() {
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
-        workerTask.initialize(TASK_CONFIG);
-        expectLastCall();
-
-        workerTask.initializeAndStart();
-        expectLastCall();
-
-        workerTask.execute();
-        expectLastCall();
-
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(workerTask);
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.run();
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
-        verify(workerTask);
+        verify(statusListener).onStartup(eq(taskId));
+        verify(statusListener).onShutdown(eq(taskId));

Review Comment:
   Nit: there's some inconsistency here in whether `eq(...)` is used to wrap 
verified arguments. I don't have a strong preference either way, but probably 
best to choose one and stick with one in order to avoid spreading FUD.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java:
##########
@@ -78,155 +74,80 @@ public void tearDown() {
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
-        workerTask.initialize(TASK_CONFIG);
-        expectLastCall();
-
-        workerTask.initializeAndStart();
-        expectLastCall();
-
-        workerTask.execute();
-        expectLastCall();
-
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(workerTask);
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.run();
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
-        verify(workerTask);
+        verify(statusListener).onStartup(eq(taskId));
+        verify(statusListener).onShutdown(eq(taskId));
     }
 
     @Test
     public void stopBeforeStarting() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.close();
-        EasyMock.expectLastCall();
+            @Override
+            public void initializeAndStart() {
+                fail("This method is expected to not be invoked");
+            }
 
-        replay(workerTask);
+            @Override
+            public void execute() {
+                fail("This method is expected to not be invoked");
+            }
+        };
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
         // now run should not do anything
         workerTask.run();
-
-        verify(workerTask);
     }
 
     @Test
     public void cancelBeforeStopping() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
-
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
         final CountDownLatch stopped = new CountDownLatch(1);
-        final Thread thread = new Thread(() -> {
-            try {
-                stopped.await();
-            } catch (Exception e) {
-            }
-        });
-
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.initializeAndStart();
-        EasyMock.expectLastCall();
-
-        workerTask.execute();
-        expectLastCall().andAnswer(() -> {
-            thread.start();
-            return null;
-        });
 
-        statusListener.onStartup(taskId);
-        expectLastCall();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.close();
-        expectLastCall();
-
-        // there should be no call to onShutdown()
+            @Override
+            public void execute() {
+                try {
+                    stopped.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
 
-        replay(workerTask);
+            // Trigger task shutdown immediately after start. The task will 
block in it's execute() method
+            // until the stopped latch is counted down (i.e. it doesn't 
actually stop after stop is triggered).
+            @Override
+            public void initializeAndStart() {
+                stop();
+            }
+        };
 
         workerTask.initialize(TASK_CONFIG);
-        workerTask.run();
+        Thread t = new Thread(workerTask);
+        t.start();
 
-        workerTask.stop();
         workerTask.cancel();
         stopped.countDown();
-        thread.join();
+        t.join();
 
-        verify(workerTask);
+        verify(statusListener).onStartup(eq(taskId));
+        // there should be no call to onShutdown()
+        verify(statusListener, never()).onShutdown(eq(taskId));

Review Comment:
   There should be no more interactions whatsoever, right? Probably makes more 
sense to verify that than to just verify that we don't record a shutdown event:
   ```suggestion
           // there should be no other status updates, including shutdown
           verifyNoMoreInteractions(statusListener);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java:
##########
@@ -56,12 +53,11 @@ public class WorkerTaskTest {
     }
     private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
-    private ConnectMetrics metrics;
     @Mock private TaskStatus.Listener statusListener;
     @Mock private ClassLoader loader;
-    RetryWithToleranceOperator retryWithToleranceOperator;
-    @Mock
-    StatusBackingStore statusBackingStore;
+    @Mock StatusBackingStore statusBackingStore;

Review Comment:
   Nit: for consistency's sake, this should be private:
   ```suggestion
       @Mock private StatusBackingStore statusBackingStore;
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java:
##########
@@ -78,155 +74,80 @@ public void tearDown() {
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
-        workerTask.initialize(TASK_CONFIG);
-        expectLastCall();
-
-        workerTask.initializeAndStart();
-        expectLastCall();
-
-        workerTask.execute();
-        expectLastCall();
-
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(workerTask);
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.run();
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
-        verify(workerTask);
+        verify(statusListener).onStartup(eq(taskId));
+        verify(statusListener).onShutdown(eq(taskId));
     }
 
     @Test
     public void stopBeforeStarting() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.close();
-        EasyMock.expectLastCall();
+            @Override
+            public void initializeAndStart() {
+                fail("This method is expected to not be invoked");
+            }
 
-        replay(workerTask);
+            @Override
+            public void execute() {
+                fail("This method is expected to not be invoked");
+            }
+        };
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
         // now run should not do anything
         workerTask.run();
-
-        verify(workerTask);
     }
 
     @Test
     public void cancelBeforeStopping() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
-
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
         final CountDownLatch stopped = new CountDownLatch(1);
-        final Thread thread = new Thread(() -> {
-            try {
-                stopped.await();
-            } catch (Exception e) {
-            }
-        });
-
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.initializeAndStart();
-        EasyMock.expectLastCall();
-
-        workerTask.execute();
-        expectLastCall().andAnswer(() -> {
-            thread.start();
-            return null;
-        });
 
-        statusListener.onStartup(taskId);
-        expectLastCall();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.close();
-        expectLastCall();
-
-        // there should be no call to onShutdown()
+            @Override
+            public void execute() {
+                try {
+                    stopped.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
 
-        replay(workerTask);
+            // Trigger task shutdown immediately after start. The task will 
block in it's execute() method

Review Comment:
   Nit:
   ```suggestion
               // Trigger task shutdown immediately after start. The task will 
block in its execute() method
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java:
##########
@@ -162,71 +121,53 @@ public void stopBeforeStarting() {
         // now run should not do anything
         workerTask.run();
 
-        verify(workerTask);
+        verify(workerTask).initialize(eq(TASK_CONFIG));
+        verify(workerTask).close();
+        verify(workerTask, never()).initializeAndStart();
+        verify(workerTask, never()).execute();
     }
 
     @Test
     public void cancelBeforeStopping() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
+        WorkerTask workerTask = mock(WorkerTask.class, withSettings()
+                .useConstructor(taskId, statusListener, TargetState.STARTED, 
loader, metrics,
                         retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
+                .defaultAnswer(CALLS_REAL_METHODS));
 
         final CountDownLatch stopped = new CountDownLatch(1);
-        final Thread thread = new Thread(() -> {
-            try {
-                stopped.await();
-            } catch (Exception e) {
-            }
-        });
 
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
+        doNothing().when(workerTask).initialize(any(TaskConfig.class));
 
-        workerTask.initializeAndStart();
-        EasyMock.expectLastCall();
-
-        workerTask.execute();
-        expectLastCall().andAnswer(() -> {
-            thread.start();
+        // Trigger task shutdown immediately after start. The task will block 
in it's execute() method
+        // until the stopped latch is counted down (i.e. it doesn't actually 
stop after stop is triggered).
+        doAnswer(i -> {
+            workerTask.stop();
             return null;
-        });
-
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        // there should be no call to onShutdown()

Review Comment:
   Oh wow, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to