C0urante commented on code in PR #12615: URL: https://github.com/apache/kafka/pull/12615#discussion_r969819802
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java: ########## @@ -78,155 +74,80 @@ public void tearDown() { public void standardStartup() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - - workerTask.initialize(TASK_CONFIG); - expectLastCall(); - - workerTask.initializeAndStart(); - expectLastCall(); - - workerTask.execute(); - expectLastCall(); - - statusListener.onStartup(taskId); - expectLastCall(); - - workerTask.close(); - expectLastCall(); - - statusListener.onShutdown(taskId); - expectLastCall(); - - replay(workerTask); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore); workerTask.initialize(TASK_CONFIG); workerTask.run(); workerTask.stop(); workerTask.awaitStop(1000L); - verify(workerTask); + verify(statusListener).onStartup(eq(taskId)); + verify(statusListener).onShutdown(eq(taskId)); } @Test public void stopBeforeStarting() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.close(); - EasyMock.expectLastCall(); + @Override + public void initializeAndStart() { + fail("This method is expected to not be invoked"); + } - replay(workerTask); + @Override + public void execute() { + fail("This method is expected to not be invoked"); + } + }; workerTask.initialize(TASK_CONFIG); workerTask.stop(); workerTask.awaitStop(1000L); // now run should not do anything workerTask.run(); - - verify(workerTask); } @Test public void cancelBeforeStopping() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - final CountDownLatch stopped = new CountDownLatch(1); - final Thread thread = new Thread(() -> { - try { - stopped.await(); - } catch (Exception e) { - } - }); - - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.initializeAndStart(); - EasyMock.expectLastCall(); - - workerTask.execute(); - expectLastCall().andAnswer(() -> { - thread.start(); - return null; - }); - statusListener.onStartup(taskId); - expectLastCall(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.close(); - expectLastCall(); - - // there should be no call to onShutdown() + @Override + public void execute() { + try { + stopped.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); Review Comment: We should never get interrupted during the test; probably safer to fail here: ```suggestion fail("Unexpectedly interrupted during test"); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java: ########## @@ -78,155 +74,80 @@ public void tearDown() { public void standardStartup() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - - workerTask.initialize(TASK_CONFIG); - expectLastCall(); - - workerTask.initializeAndStart(); - expectLastCall(); - - workerTask.execute(); - expectLastCall(); - - statusListener.onStartup(taskId); - expectLastCall(); - - workerTask.close(); - expectLastCall(); - - statusListener.onShutdown(taskId); - expectLastCall(); - - replay(workerTask); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore); workerTask.initialize(TASK_CONFIG); workerTask.run(); workerTask.stop(); workerTask.awaitStop(1000L); - verify(workerTask); + verify(statusListener).onStartup(eq(taskId)); + verify(statusListener).onShutdown(eq(taskId)); Review Comment: Nit: there's some inconsistency here in whether `eq(...)` is used to wrap verified arguments. I don't have a strong preference either way, but probably best to choose one and stick with one in order to avoid spreading FUD. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java: ########## @@ -78,155 +74,80 @@ public void tearDown() { public void standardStartup() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - - workerTask.initialize(TASK_CONFIG); - expectLastCall(); - - workerTask.initializeAndStart(); - expectLastCall(); - - workerTask.execute(); - expectLastCall(); - - statusListener.onStartup(taskId); - expectLastCall(); - - workerTask.close(); - expectLastCall(); - - statusListener.onShutdown(taskId); - expectLastCall(); - - replay(workerTask); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore); workerTask.initialize(TASK_CONFIG); workerTask.run(); workerTask.stop(); workerTask.awaitStop(1000L); - verify(workerTask); + verify(statusListener).onStartup(eq(taskId)); + verify(statusListener).onShutdown(eq(taskId)); } @Test public void stopBeforeStarting() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.close(); - EasyMock.expectLastCall(); + @Override + public void initializeAndStart() { + fail("This method is expected to not be invoked"); + } - replay(workerTask); + @Override + public void execute() { + fail("This method is expected to not be invoked"); + } + }; workerTask.initialize(TASK_CONFIG); workerTask.stop(); workerTask.awaitStop(1000L); // now run should not do anything workerTask.run(); - - verify(workerTask); } @Test public void cancelBeforeStopping() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - final CountDownLatch stopped = new CountDownLatch(1); - final Thread thread = new Thread(() -> { - try { - stopped.await(); - } catch (Exception e) { - } - }); - - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.initializeAndStart(); - EasyMock.expectLastCall(); - - workerTask.execute(); - expectLastCall().andAnswer(() -> { - thread.start(); - return null; - }); - statusListener.onStartup(taskId); - expectLastCall(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.close(); - expectLastCall(); - - // there should be no call to onShutdown() + @Override + public void execute() { + try { + stopped.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } - replay(workerTask); + // Trigger task shutdown immediately after start. The task will block in it's execute() method + // until the stopped latch is counted down (i.e. it doesn't actually stop after stop is triggered). + @Override + public void initializeAndStart() { + stop(); + } + }; workerTask.initialize(TASK_CONFIG); - workerTask.run(); + Thread t = new Thread(workerTask); + t.start(); - workerTask.stop(); workerTask.cancel(); stopped.countDown(); - thread.join(); + t.join(); - verify(workerTask); + verify(statusListener).onStartup(eq(taskId)); + // there should be no call to onShutdown() + verify(statusListener, never()).onShutdown(eq(taskId)); Review Comment: There should be no more interactions whatsoever, right? Probably makes more sense to verify that than to just verify that we don't record a shutdown event: ```suggestion // there should be no other status updates, including shutdown verifyNoMoreInteractions(statusListener); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java: ########## @@ -56,12 +53,11 @@ public class WorkerTaskTest { } private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); - private ConnectMetrics metrics; @Mock private TaskStatus.Listener statusListener; @Mock private ClassLoader loader; - RetryWithToleranceOperator retryWithToleranceOperator; - @Mock - StatusBackingStore statusBackingStore; + @Mock StatusBackingStore statusBackingStore; Review Comment: Nit: for consistency's sake, this should be private: ```suggestion @Mock private StatusBackingStore statusBackingStore; ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java: ########## @@ -78,155 +74,80 @@ public void tearDown() { public void standardStartup() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - - workerTask.initialize(TASK_CONFIG); - expectLastCall(); - - workerTask.initializeAndStart(); - expectLastCall(); - - workerTask.execute(); - expectLastCall(); - - statusListener.onStartup(taskId); - expectLastCall(); - - workerTask.close(); - expectLastCall(); - - statusListener.onShutdown(taskId); - expectLastCall(); - - replay(workerTask); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore); workerTask.initialize(TASK_CONFIG); workerTask.run(); workerTask.stop(); workerTask.awaitStop(1000L); - verify(workerTask); + verify(statusListener).onStartup(eq(taskId)); + verify(statusListener).onShutdown(eq(taskId)); } @Test public void stopBeforeStarting() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.close(); - EasyMock.expectLastCall(); + @Override + public void initializeAndStart() { + fail("This method is expected to not be invoked"); + } - replay(workerTask); + @Override + public void execute() { + fail("This method is expected to not be invoked"); + } + }; workerTask.initialize(TASK_CONFIG); workerTask.stop(); workerTask.awaitStop(1000L); // now run should not do anything workerTask.run(); - - verify(workerTask); } @Test public void cancelBeforeStopping() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - final CountDownLatch stopped = new CountDownLatch(1); - final Thread thread = new Thread(() -> { - try { - stopped.await(); - } catch (Exception e) { - } - }); - - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.initializeAndStart(); - EasyMock.expectLastCall(); - - workerTask.execute(); - expectLastCall().andAnswer(() -> { - thread.start(); - return null; - }); - statusListener.onStartup(taskId); - expectLastCall(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.close(); - expectLastCall(); - - // there should be no call to onShutdown() + @Override + public void execute() { + try { + stopped.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } - replay(workerTask); + // Trigger task shutdown immediately after start. The task will block in it's execute() method Review Comment: Nit: ```suggestion // Trigger task shutdown immediately after start. The task will block in its execute() method ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java: ########## @@ -162,71 +121,53 @@ public void stopBeforeStarting() { // now run should not do anything workerTask.run(); - verify(workerTask); + verify(workerTask).initialize(eq(TASK_CONFIG)); + verify(workerTask).close(); + verify(workerTask, never()).initializeAndStart(); + verify(workerTask, never()).execute(); } @Test public void cancelBeforeStopping() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, + WorkerTask workerTask = mock(WorkerTask.class, withSettings() + .useConstructor(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); + .defaultAnswer(CALLS_REAL_METHODS)); final CountDownLatch stopped = new CountDownLatch(1); - final Thread thread = new Thread(() -> { - try { - stopped.await(); - } catch (Exception e) { - } - }); - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); + doNothing().when(workerTask).initialize(any(TaskConfig.class)); - workerTask.initializeAndStart(); - EasyMock.expectLastCall(); - - workerTask.execute(); - expectLastCall().andAnswer(() -> { - thread.start(); + // Trigger task shutdown immediately after start. The task will block in it's execute() method + // until the stopped latch is counted down (i.e. it doesn't actually stop after stop is triggered). + doAnswer(i -> { + workerTask.stop(); return null; - }); - - statusListener.onStartup(taskId); - expectLastCall(); - - workerTask.close(); - expectLastCall(); - - // there should be no call to onShutdown() Review Comment: Oh wow, good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org