C0urante commented on code in PR #12823: URL: https://github.com/apache/kafka/pull/12823#discussion_r1014154097
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java: ########## @@ -71,122 +74,74 @@ public void setup() { Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); WorkerConfig config = new StandaloneConfig(workerProps); committer = new SourceTaskOffsetCommitter(config, executor, committers); - Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog); } @SuppressWarnings("unchecked") @Test public void testSchedule() { - Capture<Runnable> taskWrapper = EasyMock.newCapture(); + ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class); - EasyMock.expect(executor.scheduleWithFixedDelay( - EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), + when(executor.scheduleWithFixedDelay( + taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS)) - ).andReturn((ScheduledFuture) commitFuture); - - PowerMock.replayAll(); + ).thenReturn((ScheduledFuture) commitFuture); committer.schedule(taskId, task); - assertTrue(taskWrapper.hasCaptured()); assertNotNull(taskWrapper.getValue()); assertEquals(singletonMap(taskId, commitFuture), committers); - - PowerMock.verifyAll(); } @Test public void testClose() throws Exception { long timeoutMs = 1000; // Normal termination, where termination times out. - executor.shutdown(); - PowerMock.expectLastCall(); - - EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) - .andReturn(false); - mockLog.error(EasyMock.anyString()); Review Comment: We want to verify here that at least one `ERROR`-level log message was emitted by the `SourceTaskOffsetCommitter` instance during this part of the test. We can use the `LogCaptureAppender` class to do that exact thing, without installing a mock logger for the class: ```java try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) { committer.close(timeoutMs); assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("ERROR"))); } ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java: ########## @@ -71,122 +74,74 @@ public void setup() { Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); WorkerConfig config = new StandaloneConfig(workerProps); committer = new SourceTaskOffsetCommitter(config, executor, committers); - Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog); } @SuppressWarnings("unchecked") @Test public void testSchedule() { - Capture<Runnable> taskWrapper = EasyMock.newCapture(); + ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class); - EasyMock.expect(executor.scheduleWithFixedDelay( - EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), + when(executor.scheduleWithFixedDelay( + taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS)) - ).andReturn((ScheduledFuture) commitFuture); - - PowerMock.replayAll(); + ).thenReturn((ScheduledFuture) commitFuture); committer.schedule(taskId, task); - assertTrue(taskWrapper.hasCaptured()); assertNotNull(taskWrapper.getValue()); assertEquals(singletonMap(taskId, commitFuture), committers); - - PowerMock.verifyAll(); } @Test public void testClose() throws Exception { long timeoutMs = 1000; // Normal termination, where termination times out. - executor.shutdown(); - PowerMock.expectLastCall(); - - EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) - .andReturn(false); - mockLog.error(EasyMock.anyString()); - PowerMock.expectLastCall(); - PowerMock.replayAll(); + when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false); committer.close(timeoutMs); - PowerMock.verifyAll(); - PowerMock.resetAll(); - // Termination interrupted - executor.shutdown(); - PowerMock.expectLastCall(); - - EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) - .andThrow(new InterruptedException()); - PowerMock.replayAll(); + when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException()); committer.close(timeoutMs); - PowerMock.verifyAll(); + verify(executor, times(2)).shutdown(); } @Test public void testRemove() throws Exception { Review Comment: We're losing some coverage guarantees by restructuring the tests like this. Right now we know for each sub-case (removing a task successfully, removing a cancelled one, and being interrupted while removing one) that we call `cancel` and `get` on the future. With the changes here, we do get guarantees that those methods are invoked at least once at some point in the `testRemove` method (thanks to strict stubbing), but we don't get finer-grained guarantees than that. And ensuring that those methods are invoked exactly when we expect is fairly valuable, so it'd be nice if we could keep that coverage. I think we can break this test case out into four separate cases: 1. Removing a non-existent task 2. Removing a task successfully 3. Removing a task that gets cancelled 4. Removing a task and getting interrupted while doing so And we can use a helper method to reduce duplication for the last three: ```java private void testRemove() { // Try to remove an existing task when(taskFuture.cancel(false)).thenReturn(false); when(taskFuture.isDone()).thenReturn(false); when(taskId.connector()).thenReturn("MyConnector"); when(taskId.task()).thenReturn(1); committers.put(taskId, taskFuture); committer.remove(taskId); } ``` Which would make each test case fairly brief. For example, case 2 might look like this: ```java @Test public void testRemoveSuccess() throws Exception { when(taskFuture.get()).thenReturn(null); testRemove(); assertEquals(Collections.emptyMap(), committers); } ``` This way, after each test case completes, strict stubbing will ensure that the expected methods were invoked on the future at least once. We could go further and explicitly verify that they were invoked exactly once, but up to you if you'd like to add that coverage or not. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java: ########## @@ -71,122 +74,74 @@ public void setup() { Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); WorkerConfig config = new StandaloneConfig(workerProps); committer = new SourceTaskOffsetCommitter(config, executor, committers); - Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog); } @SuppressWarnings("unchecked") @Test public void testSchedule() { - Capture<Runnable> taskWrapper = EasyMock.newCapture(); + ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class); - EasyMock.expect(executor.scheduleWithFixedDelay( - EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), + when(executor.scheduleWithFixedDelay( + taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS)) - ).andReturn((ScheduledFuture) commitFuture); - - PowerMock.replayAll(); + ).thenReturn((ScheduledFuture) commitFuture); committer.schedule(taskId, task); - assertTrue(taskWrapper.hasCaptured()); assertNotNull(taskWrapper.getValue()); assertEquals(singletonMap(taskId, commitFuture), committers); - - PowerMock.verifyAll(); } @Test public void testClose() throws Exception { long timeoutMs = 1000; // Normal termination, where termination times out. - executor.shutdown(); - PowerMock.expectLastCall(); - - EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) - .andReturn(false); - mockLog.error(EasyMock.anyString()); - PowerMock.expectLastCall(); - PowerMock.replayAll(); + when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false); committer.close(timeoutMs); Review Comment: Can we add a `verify(executor, times(1)).shutdown();` after this line, to ensure that the executor has actually been shut down by this point? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java: ########## @@ -71,122 +74,74 @@ public void setup() { Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); WorkerConfig config = new StandaloneConfig(workerProps); committer = new SourceTaskOffsetCommitter(config, executor, committers); - Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog); } @SuppressWarnings("unchecked") @Test public void testSchedule() { - Capture<Runnable> taskWrapper = EasyMock.newCapture(); + ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class); - EasyMock.expect(executor.scheduleWithFixedDelay( - EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), + when(executor.scheduleWithFixedDelay( + taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS)) - ).andReturn((ScheduledFuture) commitFuture); - - PowerMock.replayAll(); + ).thenReturn((ScheduledFuture) commitFuture); committer.schedule(taskId, task); - assertTrue(taskWrapper.hasCaptured()); assertNotNull(taskWrapper.getValue()); assertEquals(singletonMap(taskId, commitFuture), committers); - - PowerMock.verifyAll(); } @Test public void testClose() throws Exception { long timeoutMs = 1000; // Normal termination, where termination times out. - executor.shutdown(); - PowerMock.expectLastCall(); - - EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) - .andReturn(false); - mockLog.error(EasyMock.anyString()); - PowerMock.expectLastCall(); - PowerMock.replayAll(); + when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false); committer.close(timeoutMs); - PowerMock.verifyAll(); - PowerMock.resetAll(); - // Termination interrupted - executor.shutdown(); - PowerMock.expectLastCall(); - - EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) - .andThrow(new InterruptedException()); - PowerMock.replayAll(); + when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException()); committer.close(timeoutMs); - PowerMock.verifyAll(); + verify(executor, times(2)).shutdown(); } @Test public void testRemove() throws Exception { // Try to remove a non-existing task - PowerMock.replayAll(); - assertTrue(committers.isEmpty()); committer.remove(taskId); assertTrue(committers.isEmpty()); - PowerMock.verifyAll(); - PowerMock.resetAll(); - // Try to remove an existing task - EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false); - EasyMock.expect(taskFuture.isDone()).andReturn(false); - EasyMock.expect(taskFuture.get()).andReturn(null); - EasyMock.expect(taskId.connector()).andReturn("MyConnector"); - EasyMock.expect(taskId.task()).andReturn(1); - PowerMock.replayAll(); + when(taskFuture.cancel(false)).thenReturn(false); + when(taskFuture.isDone()).thenReturn(false); + when(taskFuture.get()) + .thenReturn(null) + .thenThrow(new CancellationException()) + .thenThrow(new InterruptedException()); + when(taskId.connector()).thenReturn("MyConnector"); + when(taskId.task()).thenReturn(1); committers.put(taskId, taskFuture); committer.remove(taskId); assertTrue(committers.isEmpty()); - PowerMock.verifyAll(); - PowerMock.resetAll(); - // Try to remove a cancelled task - EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false); - EasyMock.expect(taskFuture.isDone()).andReturn(false); - EasyMock.expect(taskFuture.get()).andThrow(new CancellationException()); - EasyMock.expect(taskId.connector()).andReturn("MyConnector"); - EasyMock.expect(taskId.task()).andReturn(1); - mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject()); Review Comment: We can also use the `LogCaptureAppender` class here. Will probably have to invoke `LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java: ########## @@ -71,122 +74,74 @@ public void setup() { Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); WorkerConfig config = new StandaloneConfig(workerProps); committer = new SourceTaskOffsetCommitter(config, executor, committers); - Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog); Review Comment: I'm not a fan of using Whitebox to mess around with private fields, but we should find a way to preserve the coverage guarantees that we get from these tests w/r/t logging. Left a comment below with a suggestion on how we can do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org