yashmayya commented on code in PR #12615: URL: https://github.com/apache/kafka/pull/12615#discussion_r969848670
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java: ########## @@ -78,155 +74,80 @@ public void tearDown() { public void standardStartup() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - - workerTask.initialize(TASK_CONFIG); - expectLastCall(); - - workerTask.initializeAndStart(); - expectLastCall(); - - workerTask.execute(); - expectLastCall(); - - statusListener.onStartup(taskId); - expectLastCall(); - - workerTask.close(); - expectLastCall(); - - statusListener.onShutdown(taskId); - expectLastCall(); - - replay(workerTask); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore); workerTask.initialize(TASK_CONFIG); workerTask.run(); workerTask.stop(); workerTask.awaitStop(1000L); - verify(workerTask); + verify(statusListener).onStartup(eq(taskId)); + verify(statusListener).onShutdown(eq(taskId)); } @Test public void stopBeforeStarting() { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.close(); - EasyMock.expectLastCall(); + @Override + public void initializeAndStart() { + fail("This method is expected to not be invoked"); + } - replay(workerTask); + @Override + public void execute() { + fail("This method is expected to not be invoked"); + } + }; workerTask.initialize(TASK_CONFIG); workerTask.stop(); workerTask.awaitStop(1000L); // now run should not do anything workerTask.run(); - - verify(workerTask); } @Test public void cancelBeforeStopping() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId("foo", 0); - - WorkerTask workerTask = partialMockBuilder(WorkerTask.class) - .withConstructor( - ConnectorTaskId.class, - TaskStatus.Listener.class, - TargetState.class, - ClassLoader.class, - ConnectMetrics.class, - RetryWithToleranceOperator.class, - Time.class, - StatusBackingStore.class - ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, - retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) - .addMockedMethod("initialize") - .addMockedMethod("initializeAndStart") - .addMockedMethod("execute") - .addMockedMethod("close") - .createStrictMock(); - final CountDownLatch stopped = new CountDownLatch(1); - final Thread thread = new Thread(() -> { - try { - stopped.await(); - } catch (Exception e) { - } - }); - - workerTask.initialize(TASK_CONFIG); - EasyMock.expectLastCall(); - - workerTask.initializeAndStart(); - EasyMock.expectLastCall(); - - workerTask.execute(); - expectLastCall().andAnswer(() -> { - thread.start(); - return null; - }); - statusListener.onStartup(taskId); - expectLastCall(); + WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) { - workerTask.close(); - expectLastCall(); - - // there should be no call to onShutdown() + @Override + public void execute() { + try { + stopped.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } - replay(workerTask); + // Trigger task shutdown immediately after start. The task will block in it's execute() method Review Comment: Yikes, this is an embarrassing one, thanks 🙈 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org